레이블이 mysql인 게시물을 표시합니다. 모든 게시물 표시
레이블이 mysql인 게시물을 표시합니다. 모든 게시물 표시

20250117

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

 

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

시스템 아키텍처

이 시스템은 다음과 같은 구성요소로 이루어져 있습니다:

  • API 수집 스케줄러 (SCF Timer Trigger)
  • CKafka 메시지 큐
  • API 수집 워커 (SCF)
  • 데이터 저장 워커 (SCF)
  • MySQL RDS

1. API 스케줄러

API 스케줄러는 정해진 시간에 API 작업을 분배하는 역할을 합니다. 아래는 스케줄러의 예시 코드입니다:

import json
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
from datetime import datetime

def load_api_configs():
    # API 설정 로드
    return [
        {"api_id": "1", "url": "https://api1.example.com/data"},
        {"api_id": "2", "url": "https://api2.example.com/data"},
        # ... 1000개의 API 설정
    ]

def main_handler(event, context):
    # 현재 배치에서 처리할 API 목록 선택
    api_configs = load_api_configs()
    batch_size = 50  # 배치당 처리할 API 수
    
    # API 작업을 CKafka로 분배
    for i in range(0, len(api_configs), batch_size):
        batch = api_configs[i:i+batch_size]
        send_to_kafka(batch)
    
    return {"statusCode": 200, "message": "작업이 분배되었습니다"}

2. API 수집 워커

API 수집 워커는 CKafka에서 메시지를 수신하여 API 데이터를 수집하고, 수집된 데이터를 저장 큐로 전송합니다. 아래는 수집 워커의 예시 코드입니다:

import json
import requests
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client

def main_handler(event, context):
    # CKafka에서 메시지 수신
    messages = event.get('Records', [])
    
    for msg in messages:
        try:
            api_config = json.loads(msg['Ckafka']['value'])
            response = fetch_api_data(api_config)
            
            # 수집된 데이터를 저장 큐로 전송
            send_to_storage_queue({
                'api_id': api_config['api_id'],
                'data': response,
                'collected_at': datetime.now().isoformat()
            })
            
        except Exception as e:
            handle_error(api_config, str(e))

def fetch_api_data(api_config):
    response = requests.get(
        api_config['url'],
        headers={'Content-Type': 'application/json'},
        timeout=5
    )
    return response.json()

3. 데이터 저장 워커

데이터 저장 워커는 수집된 데이터를 MySQL RDS에 저장합니다. 아래는 저장 워커의 예시 코드입니다:

import json
import pymysql
from datetime import datetime

def get_db_connection():
    return pymysql.connect(
        host=os.environ.get('DB_HOST'),
        user=os.environ.get('DB_USER'),
        password=os.environ.get('DB_PASSWORD'),
        database=os.environ.get('DB_NAME'),
        charset='utf8mb4'
    )

def main_handler(event, context):
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        messages = event.get('Records', [])
        
        for msg in messages:
            data = json.loads(msg['Ckafka']['value'])
            
            # 배치 인서트 구현
            sql = """INSERT INTO collected_data 
                    (api_id, data, collected_at) 
                    VALUES (%s, %s, %s)"""
            
            cursor.execute(sql, (
                data['api_id'],
                json.dumps(data['data']),
                data['collected_at']
            ))
        
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

배포 설정

아래는 Serverless Framework를 사용한 배포 설정 예시입니다:

service: api-collector

provider:
  name: tencent
  region: ap-guangzhou
  runtime: Python3.7
  environment:
    DB_HOST: ${env:DB_HOST}
    DB_USER: ${env:DB_USER}
    DB_PASSWORD: ${env:DB_PASSWORD}
    DB_NAME: ${env:DB_NAME}
    KAFKA_TOPIC: api-tasks

functions:
  scheduler:
    handler: scheduler.main_handler
    events:
      - timer:
          name: api-scheduler
          cron: "* * * * * * *"
    memorySize: 128
    timeout: 30
    
  collector:
    handler: collector.main_handler
    events:
      - ckafka:
          name: api-collector
          topic: api-tasks
          maxMsgNum: 100
    memorySize: 256
    timeout: 60
    
  storage:
    handler: storage.main_handler
    events:
      - ckafka:
          name: data-storage
          topic: collected-data
          maxMsgNum: 1000
    memorySize: 512
    timeout: 90

시스템 특징

  • 분산 처리: 스케줄러가 API 작업을 작은 배치로 분할하고, 여러 수집기 함수가 병렬로 API 호출을 수행합니다. 저장 작업도 분산 처리되어 데이터베이스 부하를 분산합니다.
  • 오류 처리: 각 단계별 재시도 메커니즘이 있으며, 실패한 작업은 별도 큐로 이동하여 후처리됩니다. 모니터링 및 알림 시스템과 연동됩니다.
  • 성능 최적화: 배치 처리로 데이터베이스 연결을 최소화하고, 메모리 사용량에 따른 함수 리소스 할당이 이루어집니다. 타임아웃 설정으로 장애 전파를 방지합니다.
  • 확장성: API 수가 증가해도 자동으로 확장되며, 부하에 따라 함수 인스턴스가 자동 스케일링됩니다. 모듈화된 구조로 유지보수가 용이합니다.

모니터링과 로깅

아래는 모니터링과 로깅을 위한 예시 코드입니다:

import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# 모니터링 지표
def record_metrics(api_id, response_time, status):
    logger.info(f"API 호출 통계 - ID: {api_id}, 응답시간: {response_time}ms, 상태: {status}")

이 아키텍처는 1000개의 API를 안정적으로 수집할 수 있으며, 필요에 따라 더 많은 API도 처리할 수 있도록 확장 가능합니다. 시스템은 우수한 오류 처리, 확장성 및 유지보수성을 갖추고 있습니다.

20250104

asyncio와 MySQL 라이브러리의 동시성 처리

 Python에서 asyncio를 사용하여 동시성 코드를 작성할 때, MySQL과 같은 데이터베이스 작업을 처리하는 경우가 많습니다. 하지만 일부 MySQL 드라이버(예: mysqlclient)는 블로킹 I/O 방식으로 동작하기 때문에, asyncio의 이벤트 루프에서 제대로 동작하지 않을 수 있습니다. 이 글에서는 이 문제를 해결하기 위한 방법과 다양한 테스트 결과를 공유합니다.

1. 문제 상황: 블로킹 I/O와 asyncio

1.1 블로킹 I/O란?

블로킹 I/O는 특정 작업(예: 데이터베이스 쿼리)이 완료될 때까지 호출한 프로그램이 멈춰 있는 방식입니다. 예를 들어, mysqlclient는 데이터베이스 작업을 수행할 때 호출한 스레드가 작업이 끝날 때까지 멈춥니다.

1.2 asyncio와의 충돌

asyncio는 비동기 이벤트 루프를 기반으로 동작하며, 코루틴(coroutine)을 통해 논블로킹 방식으로 작업을 처리합니다. 하지만 블로킹 I/O 라이브러리를 사용하면 asyncio의 이벤트 루프가 멈추고, 다른 코루틴이 실행되지 못하는 문제가 발생합니다.

2. 테스트 환경 및 목표

테스트 환경

  • Python 3.7.9
  • 라이브러리:
    • aiomysql (비동기 MySQL 드라이버)
    • mysqlclient (블로킹 MySQL 드라이버)

테스트 목표

  • 동일한 작업(3개의 SELECT SLEEP(X) 쿼리)을 각각의 방식으로 실행하여 성능과 동작 방식을 비교합니다.
  • 블로킹 문제를 해결하기 위해 run_in_executor() 및 기타 방법을 사용합니다.

3. 테스트 코드 및 결과

3.1 aiomysql + asyncio

코드

python
import asyncio import logging import aiomysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') async def sleep_async(delay): logging.info(f'{delay}-start') async with aiomysql.connect() as dbconn: async with dbconn.cursor() as cursor: await cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))

결과

text
2020-11-04 12:46:04 [INFO] 2-start 2020-11-04 12:46:04 [INFO] 4-start 2020-11-04 12:46:04 [INFO] 6-start 2020-11-04 12:46:06 [INFO] 2-end 2020-11-04 12:46:08 [INFO] 4-end 2020-11-04 12:46:10 [INFO] 6-end

분석

  • 총 실행 시간은 약 6초.
  • 비동기 방식으로 잘 작동하며, 모든 쿼리가 동시에 실행됩니다.

3.2 mysqlclient + asyncio

코드

python
import asyncio from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') async def sleep_async(delay): return sleep(delay) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))

결과

text
2020-11-04 12:47:30 [INFO] 2-start 2020-11-04 12:47:32 [INFO] 2-end 2020-11-04 12:47:32 [INFO] 4-start 2020-11-04 12:47:36 [INFO] 4-end 2020-11-04 12:47:36 [INFO] 6-start 2020-11-04 12:47:42 [INFO] 6-end

분석

  • 총 실행 시간은 약 12초.
  • 각 쿼리가 순차적으로 실행되며, 블로킹 문제가 발생했습니다.

3.3 mysqlclient + Thread

코드

python
from threading import Thread from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') threads = [Thread(target=sleep, args=(i,)) for i in range(2, 7, 2)] for thread in threads: thread.start() for thread in threads: thread.join()

결과

text
2020-11-04 12:49:02 [INFO] 2-start 2020-11-04 12:49:02 [INFO] 4-start 2020-11-04 12:49:02 [INFO] 6-start 2020-11-04 12:49:04 [INFO] 2-end 2020-11-04 12:49:06 [INFO] 4-end 2020-11-04 12:49:08 [INFO] 6-end

분석

총 실행 시간은 약 6초이며, 스레드를 사용하여 병렬 처리가 잘 이루어졌습니다.

3.4 mysqlclient + asyncio (run_in_executor)

코드 (기본 실행기 사용)

python
import asyncio from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') loop = asyncio.get_event_loop() futures = [loop.run_in_executor(None, sleep, i) for i in range(2, 7, 2)] loop.run_until_complete(asyncio.gather(*futures))

결과 (기본 실행기 사용)

text
2020-11-04 13:00:05 [INFO] 2-start 2020-11-04 13:00:05 [INFO] 4-start 2020-11-04 13:00:05 [INFO] 6-start 2020-11-04 13:00:07 [INFO] 2-end 2020-11-04 13:00:09 [INFO] 4-end 2020-11-04 13:00:11 [INFO] 6-end

분석 (기본 실행기 사용)

총 실행 시간은 약 6초이며, 블로킹 문제 없이 병렬 처리가 이루어졌습니다.

결론 및 권장 사항

  1. 최적의 선택
    • 가능한 경우 비동기 라이브러리인 aiomysql 또는 asyncmy를 사용하는 것이 가장 효율적입니다.
    • 하지만 성능이 중요한 경우 mysqlclient가 더 빠른 선택일 수 있습니다.
  2. 블로킹 라이브러리 처리
    • run_in_executor() 또는 Python >=3.9의 asyncio.to_thread()를 사용하여 블로킹 코드를 비동기로 전환할 수 있습니다.
    • 스레드풀 크기를 조정해 성능을 최적화할 수 있습니다.
  3. 드라이버 성능
    • 여러 벤치마크에 따르면 mysqlclient는 가장 빠른 MySQL 드라이버입니다.
    • 하지만 비동기 환경에서는 aiomysql이나 asyncmy가 더 적합합니다.