레이블이 mysql인 게시물을 표시합니다. 모든 게시물 표시
레이블이 mysql인 게시물을 표시합니다. 모든 게시물 표시

20250502

mysql json 인덱스

MySQL에서 JSON 인덱스 생성 및 활용 방법

MySQL에서 JSON 타입 컬럼의 특정 값을 빠르게 검색하려면, 해당 값에 대해 인덱스를 생성해야 합니다. JSON 전체에 직접 인덱스를 걸 수는 없고, JSON 내부의 특정 속성을 대상으로 인덱스를 생성하는 것이 일반적입니다.

1. Generated Column(가상/생성 열) + 인덱스

MySQL 5.7 이상에서는 Generated Column(생성 열, 가상 열 또는 저장 열)을 활용하여 JSON 속성을 별도 컬럼으로 추출한 뒤, 이 컬럼에 인덱스를 생성하는 방식이 가장 널리 사용됩니다23613.

예시

sql
-- 1. 가상 컬럼 추가 (예: user라는 키 값 추출) ALTER TABLE customers ADD COLUMN user VARCHAR(100) GENERATED ALWAYS AS (JSON_UNQUOTE(custinfo->'$.user')) STORED; -- 2. 인덱스 생성 ALTER TABLE customers ADD INDEX idx_user (user);
  • VIRTUAL: 실제 데이터 저장 없이 쿼리 시 계산 (디스크 공간 절약, 성능은 다소 낮을 수 있음)

  • STORED: 실제로 값을 저장 (조회 성능 우수, 저장 공간 필요)

쿼리 예시

sql
SELECT * FROM customers WHERE user = 'Jill';

이렇게 하면 해당 인덱스가 사용되어 빠른 검색이 가능합니다.

2. 함수 기반 인덱스(Functional Index)

MySQL 8.0.13 이상에서는 함수 기반 인덱스(Functional Index)를 직접 생성할 수 있습니다.
즉, JSON 추출 함수 결과에 바로 인덱스를 걸 수 있습니다168.

예시

sql
ALTER TABLE customers ADD INDEX idx_user ((CAST(custinfo->>'$.user' AS CHAR(100)) COLLATE utf8mb4_bin));
  • custinfo->>'$.user'는 JSON에서 user 값을 문자열로 추출합니다.

  • COLLATE utf8mb4_bin을 명시해야 인덱스와 쿼리의 정렬 방식이 일치하여 인덱스가 정상적으로 동작합니다1.

쿼리 예시

sql
SELECT * FROM customers WHERE custinfo->>'$.user' = 'Jill';

3. 인덱스 생성 시 주의사항

  • 정렬(COLLATION): 함수 기반 인덱스 생성 시, 인덱스의 collation과 쿼리의 collation이 일치해야 인덱스가 제대로 사용됩니다.
    예를 들어, COLLATE utf8mb4_bin을 명시적으로 지정해야 할 수 있습니다1.

  • 인덱스 오버헤드: 인덱스를 추가하면 검색은 빨라지지만, 데이터 추가/수정/삭제 시에는 인덱스 유지 비용이 발생합니다6.

  • JSON 전체 인덱스 불가: JSON 전체에 인덱스를 걸 수는 없고, 반드시 특정 속성(키)에 대해 인덱스를 생성해야 합니다11.

4. 참고: MySQL 8 이상 JSON 인덱스 예시

sql
ALTER TABLE my_table ADD INDEX idx_json_field ((json_column->>'$.key'));

이렇게 하면 json_columnkey 값에 대해 인덱스가 생성되고, WHERE 조건에서 해당 값을 검색할 때 인덱스를 사용할 수 있습니다689.

요약

  • 가상(생성) 컬럼을 만들고 인덱스 생성 → 가장 일반적, 호환성 높음

  • 함수 기반 인덱스로 바로 생성(MySQL 8.0.13 이상) → 더 간편

  • COLLATE 옵션 주의

  • JSON 전체가 아닌, 특정 키에만 인덱스 가능

이 방식을 적용하면 대용량 JSON 데이터에서도 원하는 값에 대한 빠른 검색이 가능합니다.

Citations:

  1. https://hoing.io/archives/78968
  2. https://iamwhat.tistory.com/65
  3. https://wildeveloperetrain.tistory.com/227
  4. https://ks2colorworld.tistory.com/20
  5. https://mysqldba.tistory.com/278
  6. https://wikidocs.net/226201
  7. https://kangworld.tistory.com/309
  8. https://rawshrimpsushi.tistory.com/78
  9. https://kti.or.kr/mysql%EC%97%90%EC%84%9C%EC%9D%98-json-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B2%98%EB%A6%AC-%EB%B0%A9%EB%B2%95/
  10. https://accept.tistory.com/35
  11. https://devocean.sk.com/blog/techBoardDetail.do?ID=165191&boardType=techBlog
  12. https://oneoneone.kr/content/c4ee16d7
  13. https://knight76.tistory.com/entry/mysql-5714%EB%B6%80%ED%84%B0-%EC%A7%80%EC%9B%90%EB%90%98%EB%8A%94-json-%ED%95%84%EB%93%9C-%EC%9D%B8%EB%8D%B1%EC%8A%A4-%EC%A7%80%EC%9B%90
  14. https://velog.io/@jhbaik1501/MySQL-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%B6%84%EC%84%9D%EC%97%90%EC%84%9C-%EC%83%9D%EA%B8%B4-%EC%84%B1%EB%8A%A5-%EC%9D%B4%EC%8A%88-%ED%95%B4%EA%B2%B0
  15. https://jojoldu.tistory.com/565
  16. https://iamwhat.tistory.com/66
  17. https://wikidocs.net/226252

20250117

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

 

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

시스템 아키텍처

이 시스템은 다음과 같은 구성요소로 이루어져 있습니다:

  • API 수집 스케줄러 (SCF Timer Trigger)
  • CKafka 메시지 큐
  • API 수집 워커 (SCF)
  • 데이터 저장 워커 (SCF)
  • MySQL RDS

1. API 스케줄러

API 스케줄러는 정해진 시간에 API 작업을 분배하는 역할을 합니다. 아래는 스케줄러의 예시 코드입니다:

import json
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
from datetime import datetime

def load_api_configs():
    # API 설정 로드
    return [
        {"api_id": "1", "url": "https://api1.example.com/data"},
        {"api_id": "2", "url": "https://api2.example.com/data"},
        # ... 1000개의 API 설정
    ]

def main_handler(event, context):
    # 현재 배치에서 처리할 API 목록 선택
    api_configs = load_api_configs()
    batch_size = 50  # 배치당 처리할 API 수
    
    # API 작업을 CKafka로 분배
    for i in range(0, len(api_configs), batch_size):
        batch = api_configs[i:i+batch_size]
        send_to_kafka(batch)
    
    return {"statusCode": 200, "message": "작업이 분배되었습니다"}

2. API 수집 워커

API 수집 워커는 CKafka에서 메시지를 수신하여 API 데이터를 수집하고, 수집된 데이터를 저장 큐로 전송합니다. 아래는 수집 워커의 예시 코드입니다:

import json
import requests
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client

def main_handler(event, context):
    # CKafka에서 메시지 수신
    messages = event.get('Records', [])
    
    for msg in messages:
        try:
            api_config = json.loads(msg['Ckafka']['value'])
            response = fetch_api_data(api_config)
            
            # 수집된 데이터를 저장 큐로 전송
            send_to_storage_queue({
                'api_id': api_config['api_id'],
                'data': response,
                'collected_at': datetime.now().isoformat()
            })
            
        except Exception as e:
            handle_error(api_config, str(e))

def fetch_api_data(api_config):
    response = requests.get(
        api_config['url'],
        headers={'Content-Type': 'application/json'},
        timeout=5
    )
    return response.json()

3. 데이터 저장 워커

데이터 저장 워커는 수집된 데이터를 MySQL RDS에 저장합니다. 아래는 저장 워커의 예시 코드입니다:

import json
import pymysql
from datetime import datetime

def get_db_connection():
    return pymysql.connect(
        host=os.environ.get('DB_HOST'),
        user=os.environ.get('DB_USER'),
        password=os.environ.get('DB_PASSWORD'),
        database=os.environ.get('DB_NAME'),
        charset='utf8mb4'
    )

def main_handler(event, context):
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        messages = event.get('Records', [])
        
        for msg in messages:
            data = json.loads(msg['Ckafka']['value'])
            
            # 배치 인서트 구현
            sql = """INSERT INTO collected_data 
                    (api_id, data, collected_at) 
                    VALUES (%s, %s, %s)"""
            
            cursor.execute(sql, (
                data['api_id'],
                json.dumps(data['data']),
                data['collected_at']
            ))
        
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

배포 설정

아래는 Serverless Framework를 사용한 배포 설정 예시입니다:

service: api-collector

provider:
  name: tencent
  region: ap-guangzhou
  runtime: Python3.7
  environment:
    DB_HOST: ${env:DB_HOST}
    DB_USER: ${env:DB_USER}
    DB_PASSWORD: ${env:DB_PASSWORD}
    DB_NAME: ${env:DB_NAME}
    KAFKA_TOPIC: api-tasks

functions:
  scheduler:
    handler: scheduler.main_handler
    events:
      - timer:
          name: api-scheduler
          cron: "* * * * * * *"
    memorySize: 128
    timeout: 30
    
  collector:
    handler: collector.main_handler
    events:
      - ckafka:
          name: api-collector
          topic: api-tasks
          maxMsgNum: 100
    memorySize: 256
    timeout: 60
    
  storage:
    handler: storage.main_handler
    events:
      - ckafka:
          name: data-storage
          topic: collected-data
          maxMsgNum: 1000
    memorySize: 512
    timeout: 90

시스템 특징

  • 분산 처리: 스케줄러가 API 작업을 작은 배치로 분할하고, 여러 수집기 함수가 병렬로 API 호출을 수행합니다. 저장 작업도 분산 처리되어 데이터베이스 부하를 분산합니다.
  • 오류 처리: 각 단계별 재시도 메커니즘이 있으며, 실패한 작업은 별도 큐로 이동하여 후처리됩니다. 모니터링 및 알림 시스템과 연동됩니다.
  • 성능 최적화: 배치 처리로 데이터베이스 연결을 최소화하고, 메모리 사용량에 따른 함수 리소스 할당이 이루어집니다. 타임아웃 설정으로 장애 전파를 방지합니다.
  • 확장성: API 수가 증가해도 자동으로 확장되며, 부하에 따라 함수 인스턴스가 자동 스케일링됩니다. 모듈화된 구조로 유지보수가 용이합니다.

모니터링과 로깅

아래는 모니터링과 로깅을 위한 예시 코드입니다:

import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# 모니터링 지표
def record_metrics(api_id, response_time, status):
    logger.info(f"API 호출 통계 - ID: {api_id}, 응답시간: {response_time}ms, 상태: {status}")

이 아키텍처는 1000개의 API를 안정적으로 수집할 수 있으며, 필요에 따라 더 많은 API도 처리할 수 있도록 확장 가능합니다. 시스템은 우수한 오류 처리, 확장성 및 유지보수성을 갖추고 있습니다.

20250104

asyncio와 MySQL 라이브러리의 동시성 처리

 Python에서 asyncio를 사용하여 동시성 코드를 작성할 때, MySQL과 같은 데이터베이스 작업을 처리하는 경우가 많습니다. 하지만 일부 MySQL 드라이버(예: mysqlclient)는 블로킹 I/O 방식으로 동작하기 때문에, asyncio의 이벤트 루프에서 제대로 동작하지 않을 수 있습니다. 이 글에서는 이 문제를 해결하기 위한 방법과 다양한 테스트 결과를 공유합니다.

1. 문제 상황: 블로킹 I/O와 asyncio

1.1 블로킹 I/O란?

블로킹 I/O는 특정 작업(예: 데이터베이스 쿼리)이 완료될 때까지 호출한 프로그램이 멈춰 있는 방식입니다. 예를 들어, mysqlclient는 데이터베이스 작업을 수행할 때 호출한 스레드가 작업이 끝날 때까지 멈춥니다.

1.2 asyncio와의 충돌

asyncio는 비동기 이벤트 루프를 기반으로 동작하며, 코루틴(coroutine)을 통해 논블로킹 방식으로 작업을 처리합니다. 하지만 블로킹 I/O 라이브러리를 사용하면 asyncio의 이벤트 루프가 멈추고, 다른 코루틴이 실행되지 못하는 문제가 발생합니다.

2. 테스트 환경 및 목표

테스트 환경

  • Python 3.7.9
  • 라이브러리:
    • aiomysql (비동기 MySQL 드라이버)
    • mysqlclient (블로킹 MySQL 드라이버)

테스트 목표

  • 동일한 작업(3개의 SELECT SLEEP(X) 쿼리)을 각각의 방식으로 실행하여 성능과 동작 방식을 비교합니다.
  • 블로킹 문제를 해결하기 위해 run_in_executor() 및 기타 방법을 사용합니다.

3. 테스트 코드 및 결과

3.1 aiomysql + asyncio

코드

python
import asyncio import logging import aiomysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') async def sleep_async(delay): logging.info(f'{delay}-start') async with aiomysql.connect() as dbconn: async with dbconn.cursor() as cursor: await cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))

결과

text
2020-11-04 12:46:04 [INFO] 2-start 2020-11-04 12:46:04 [INFO] 4-start 2020-11-04 12:46:04 [INFO] 6-start 2020-11-04 12:46:06 [INFO] 2-end 2020-11-04 12:46:08 [INFO] 4-end 2020-11-04 12:46:10 [INFO] 6-end

분석

  • 총 실행 시간은 약 6초.
  • 비동기 방식으로 잘 작동하며, 모든 쿼리가 동시에 실행됩니다.

3.2 mysqlclient + asyncio

코드

python
import asyncio from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') async def sleep_async(delay): return sleep(delay) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))

결과

text
2020-11-04 12:47:30 [INFO] 2-start 2020-11-04 12:47:32 [INFO] 2-end 2020-11-04 12:47:32 [INFO] 4-start 2020-11-04 12:47:36 [INFO] 4-end 2020-11-04 12:47:36 [INFO] 6-start 2020-11-04 12:47:42 [INFO] 6-end

분석

  • 총 실행 시간은 약 12초.
  • 각 쿼리가 순차적으로 실행되며, 블로킹 문제가 발생했습니다.

3.3 mysqlclient + Thread

코드

python
from threading import Thread from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') threads = [Thread(target=sleep, args=(i,)) for i in range(2, 7, 2)] for thread in threads: thread.start() for thread in threads: thread.join()

결과

text
2020-11-04 12:49:02 [INFO] 2-start 2020-11-04 12:49:02 [INFO] 4-start 2020-11-04 12:49:02 [INFO] 6-start 2020-11-04 12:49:04 [INFO] 2-end 2020-11-04 12:49:06 [INFO] 4-end 2020-11-04 12:49:08 [INFO] 6-end

분석

총 실행 시간은 약 6초이며, 스레드를 사용하여 병렬 처리가 잘 이루어졌습니다.

3.4 mysqlclient + asyncio (run_in_executor)

코드 (기본 실행기 사용)

python
import asyncio from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') loop = asyncio.get_event_loop() futures = [loop.run_in_executor(None, sleep, i) for i in range(2, 7, 2)] loop.run_until_complete(asyncio.gather(*futures))

결과 (기본 실행기 사용)

text
2020-11-04 13:00:05 [INFO] 2-start 2020-11-04 13:00:05 [INFO] 4-start 2020-11-04 13:00:05 [INFO] 6-start 2020-11-04 13:00:07 [INFO] 2-end 2020-11-04 13:00:09 [INFO] 4-end 2020-11-04 13:00:11 [INFO] 6-end

분석 (기본 실행기 사용)

총 실행 시간은 약 6초이며, 블로킹 문제 없이 병렬 처리가 이루어졌습니다.

결론 및 권장 사항

  1. 최적의 선택
    • 가능한 경우 비동기 라이브러리인 aiomysql 또는 asyncmy를 사용하는 것이 가장 효율적입니다.
    • 하지만 성능이 중요한 경우 mysqlclient가 더 빠른 선택일 수 있습니다.
  2. 블로킹 라이브러리 처리
    • run_in_executor() 또는 Python >=3.9의 asyncio.to_thread()를 사용하여 블로킹 코드를 비동기로 전환할 수 있습니다.
    • 스레드풀 크기를 조정해 성능을 최적화할 수 있습니다.
  3. 드라이버 성능
    • 여러 벤치마크에 따르면 mysqlclient는 가장 빠른 MySQL 드라이버입니다.
    • 하지만 비동기 환경에서는 aiomysql이나 asyncmy가 더 적합합니다.