20250117

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

 

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

시스템 아키텍처

이 시스템은 다음과 같은 구성요소로 이루어져 있습니다:

  • API 수집 스케줄러 (SCF Timer Trigger)
  • CKafka 메시지 큐
  • API 수집 워커 (SCF)
  • 데이터 저장 워커 (SCF)
  • MySQL RDS

1. API 스케줄러

API 스케줄러는 정해진 시간에 API 작업을 분배하는 역할을 합니다. 아래는 스케줄러의 예시 코드입니다:

import json
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
from datetime import datetime

def load_api_configs():
    # API 설정 로드
    return [
        {"api_id": "1", "url": "https://api1.example.com/data"},
        {"api_id": "2", "url": "https://api2.example.com/data"},
        # ... 1000개의 API 설정
    ]

def main_handler(event, context):
    # 현재 배치에서 처리할 API 목록 선택
    api_configs = load_api_configs()
    batch_size = 50  # 배치당 처리할 API 수
    
    # API 작업을 CKafka로 분배
    for i in range(0, len(api_configs), batch_size):
        batch = api_configs[i:i+batch_size]
        send_to_kafka(batch)
    
    return {"statusCode": 200, "message": "작업이 분배되었습니다"}

2. API 수집 워커

API 수집 워커는 CKafka에서 메시지를 수신하여 API 데이터를 수집하고, 수집된 데이터를 저장 큐로 전송합니다. 아래는 수집 워커의 예시 코드입니다:

import json
import requests
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client

def main_handler(event, context):
    # CKafka에서 메시지 수신
    messages = event.get('Records', [])
    
    for msg in messages:
        try:
            api_config = json.loads(msg['Ckafka']['value'])
            response = fetch_api_data(api_config)
            
            # 수집된 데이터를 저장 큐로 전송
            send_to_storage_queue({
                'api_id': api_config['api_id'],
                'data': response,
                'collected_at': datetime.now().isoformat()
            })
            
        except Exception as e:
            handle_error(api_config, str(e))

def fetch_api_data(api_config):
    response = requests.get(
        api_config['url'],
        headers={'Content-Type': 'application/json'},
        timeout=5
    )
    return response.json()

3. 데이터 저장 워커

데이터 저장 워커는 수집된 데이터를 MySQL RDS에 저장합니다. 아래는 저장 워커의 예시 코드입니다:

import json
import pymysql
from datetime import datetime

def get_db_connection():
    return pymysql.connect(
        host=os.environ.get('DB_HOST'),
        user=os.environ.get('DB_USER'),
        password=os.environ.get('DB_PASSWORD'),
        database=os.environ.get('DB_NAME'),
        charset='utf8mb4'
    )

def main_handler(event, context):
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        messages = event.get('Records', [])
        
        for msg in messages:
            data = json.loads(msg['Ckafka']['value'])
            
            # 배치 인서트 구현
            sql = """INSERT INTO collected_data 
                    (api_id, data, collected_at) 
                    VALUES (%s, %s, %s)"""
            
            cursor.execute(sql, (
                data['api_id'],
                json.dumps(data['data']),
                data['collected_at']
            ))
        
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

배포 설정

아래는 Serverless Framework를 사용한 배포 설정 예시입니다:

service: api-collector

provider:
  name: tencent
  region: ap-guangzhou
  runtime: Python3.7
  environment:
    DB_HOST: ${env:DB_HOST}
    DB_USER: ${env:DB_USER}
    DB_PASSWORD: ${env:DB_PASSWORD}
    DB_NAME: ${env:DB_NAME}
    KAFKA_TOPIC: api-tasks

functions:
  scheduler:
    handler: scheduler.main_handler
    events:
      - timer:
          name: api-scheduler
          cron: "* * * * * * *"
    memorySize: 128
    timeout: 30
    
  collector:
    handler: collector.main_handler
    events:
      - ckafka:
          name: api-collector
          topic: api-tasks
          maxMsgNum: 100
    memorySize: 256
    timeout: 60
    
  storage:
    handler: storage.main_handler
    events:
      - ckafka:
          name: data-storage
          topic: collected-data
          maxMsgNum: 1000
    memorySize: 512
    timeout: 90

시스템 특징

  • 분산 처리: 스케줄러가 API 작업을 작은 배치로 분할하고, 여러 수집기 함수가 병렬로 API 호출을 수행합니다. 저장 작업도 분산 처리되어 데이터베이스 부하를 분산합니다.
  • 오류 처리: 각 단계별 재시도 메커니즘이 있으며, 실패한 작업은 별도 큐로 이동하여 후처리됩니다. 모니터링 및 알림 시스템과 연동됩니다.
  • 성능 최적화: 배치 처리로 데이터베이스 연결을 최소화하고, 메모리 사용량에 따른 함수 리소스 할당이 이루어집니다. 타임아웃 설정으로 장애 전파를 방지합니다.
  • 확장성: API 수가 증가해도 자동으로 확장되며, 부하에 따라 함수 인스턴스가 자동 스케일링됩니다. 모듈화된 구조로 유지보수가 용이합니다.

모니터링과 로깅

아래는 모니터링과 로깅을 위한 예시 코드입니다:

import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# 모니터링 지표
def record_metrics(api_id, response_time, status):
    logger.info(f"API 호출 통계 - ID: {api_id}, 응답시간: {response_time}ms, 상태: {status}")

결론

이 아키텍처는 1000개의 API를 안정적으로 수집할 수 있으며, 필요에 따라 더 많은 API도 처리할 수 있도록 확장 가능합니다. 시스템은 우수한 오류 처리, 확장성 및 유지보수성을 갖추고 있습니다.

댓글 없음: