레이블이 scf인 게시물을 표시합니다. 모든 게시물 표시
레이블이 scf인 게시물을 표시합니다. 모든 게시물 표시

20250117

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

 

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

시스템 아키텍처

이 시스템은 다음과 같은 구성요소로 이루어져 있습니다:

  • API 수집 스케줄러 (SCF Timer Trigger)
  • CKafka 메시지 큐
  • API 수집 워커 (SCF)
  • 데이터 저장 워커 (SCF)
  • MySQL RDS

1. API 스케줄러

API 스케줄러는 정해진 시간에 API 작업을 분배하는 역할을 합니다. 아래는 스케줄러의 예시 코드입니다:

import json
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
from datetime import datetime

def load_api_configs():
    # API 설정 로드
    return [
        {"api_id": "1", "url": "https://api1.example.com/data"},
        {"api_id": "2", "url": "https://api2.example.com/data"},
        # ... 1000개의 API 설정
    ]

def main_handler(event, context):
    # 현재 배치에서 처리할 API 목록 선택
    api_configs = load_api_configs()
    batch_size = 50  # 배치당 처리할 API 수
    
    # API 작업을 CKafka로 분배
    for i in range(0, len(api_configs), batch_size):
        batch = api_configs[i:i+batch_size]
        send_to_kafka(batch)
    
    return {"statusCode": 200, "message": "작업이 분배되었습니다"}

2. API 수집 워커

API 수집 워커는 CKafka에서 메시지를 수신하여 API 데이터를 수집하고, 수집된 데이터를 저장 큐로 전송합니다. 아래는 수집 워커의 예시 코드입니다:

import json
import requests
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client

def main_handler(event, context):
    # CKafka에서 메시지 수신
    messages = event.get('Records', [])
    
    for msg in messages:
        try:
            api_config = json.loads(msg['Ckafka']['value'])
            response = fetch_api_data(api_config)
            
            # 수집된 데이터를 저장 큐로 전송
            send_to_storage_queue({
                'api_id': api_config['api_id'],
                'data': response,
                'collected_at': datetime.now().isoformat()
            })
            
        except Exception as e:
            handle_error(api_config, str(e))

def fetch_api_data(api_config):
    response = requests.get(
        api_config['url'],
        headers={'Content-Type': 'application/json'},
        timeout=5
    )
    return response.json()

3. 데이터 저장 워커

데이터 저장 워커는 수집된 데이터를 MySQL RDS에 저장합니다. 아래는 저장 워커의 예시 코드입니다:

import json
import pymysql
from datetime import datetime

def get_db_connection():
    return pymysql.connect(
        host=os.environ.get('DB_HOST'),
        user=os.environ.get('DB_USER'),
        password=os.environ.get('DB_PASSWORD'),
        database=os.environ.get('DB_NAME'),
        charset='utf8mb4'
    )

def main_handler(event, context):
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        messages = event.get('Records', [])
        
        for msg in messages:
            data = json.loads(msg['Ckafka']['value'])
            
            # 배치 인서트 구현
            sql = """INSERT INTO collected_data 
                    (api_id, data, collected_at) 
                    VALUES (%s, %s, %s)"""
            
            cursor.execute(sql, (
                data['api_id'],
                json.dumps(data['data']),
                data['collected_at']
            ))
        
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

배포 설정

아래는 Serverless Framework를 사용한 배포 설정 예시입니다:

service: api-collector

provider:
  name: tencent
  region: ap-guangzhou
  runtime: Python3.7
  environment:
    DB_HOST: ${env:DB_HOST}
    DB_USER: ${env:DB_USER}
    DB_PASSWORD: ${env:DB_PASSWORD}
    DB_NAME: ${env:DB_NAME}
    KAFKA_TOPIC: api-tasks

functions:
  scheduler:
    handler: scheduler.main_handler
    events:
      - timer:
          name: api-scheduler
          cron: "* * * * * * *"
    memorySize: 128
    timeout: 30
    
  collector:
    handler: collector.main_handler
    events:
      - ckafka:
          name: api-collector
          topic: api-tasks
          maxMsgNum: 100
    memorySize: 256
    timeout: 60
    
  storage:
    handler: storage.main_handler
    events:
      - ckafka:
          name: data-storage
          topic: collected-data
          maxMsgNum: 1000
    memorySize: 512
    timeout: 90

시스템 특징

  • 분산 처리: 스케줄러가 API 작업을 작은 배치로 분할하고, 여러 수집기 함수가 병렬로 API 호출을 수행합니다. 저장 작업도 분산 처리되어 데이터베이스 부하를 분산합니다.
  • 오류 처리: 각 단계별 재시도 메커니즘이 있으며, 실패한 작업은 별도 큐로 이동하여 후처리됩니다. 모니터링 및 알림 시스템과 연동됩니다.
  • 성능 최적화: 배치 처리로 데이터베이스 연결을 최소화하고, 메모리 사용량에 따른 함수 리소스 할당이 이루어집니다. 타임아웃 설정으로 장애 전파를 방지합니다.
  • 확장성: API 수가 증가해도 자동으로 확장되며, 부하에 따라 함수 인스턴스가 자동 스케일링됩니다. 모듈화된 구조로 유지보수가 용이합니다.

모니터링과 로깅

아래는 모니터링과 로깅을 위한 예시 코드입니다:

import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# 모니터링 지표
def record_metrics(api_id, response_time, status):
    logger.info(f"API 호출 통계 - ID: {api_id}, 응답시간: {response_time}ms, 상태: {status}")

결론

이 아키텍처는 1000개의 API를 안정적으로 수집할 수 있으며, 필요에 따라 더 많은 API도 처리할 수 있도록 확장 가능합니다. 시스템은 우수한 오류 처리, 확장성 및 유지보수성을 갖추고 있습니다.

Tencent Cloud SCF에서 Python RDB 연결

 

Tencent Cloud SCF에서 Python RDB 연결 가이드

기본 설정

필요한 패키지를 먼저 설치합니다:

pip install pymysql
pip install sqlalchemy

데이터베이스 연결 코드

기본 연결 설정:

import os
import pymysql
from sqlalchemy import create_engine

def get_db_connection():
    host = os.environ.get('DB_HOST')
    port = int(os.environ.get('DB_PORT', 3306))
    user = os.environ.get('DB_USER')
    password = os.environ.get('DB_PASSWORD')
    database = os.environ.get('DB_NAME')
    
    connection = pymysql.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        database=database,
        charset='utf8mb4'
    )
    return connection

SQLAlchemy 사용 예제:

def get_db_engine():
    DB_URL = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
    engine = create_engine(DB_URL)
    return engine

def main_handler(event, context):
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        # 쿼리 실행
        cursor.execute("SELECT * FROM your_table")
        result = cursor.fetchall()
        
        # 변경사항 저장
        conn.commit()
        
        return {"statusCode": 200, "body": result}
    except Exception as e:
        return {"statusCode": 500, "body": str(e)}
    finally:
        cursor.close()
        conn.close()

보안 설정

VPC 설정

  • SCF 콘솔에서 VPC 설정
  • 보안그룹 설정
  • 데이터베이스와 동일한 VPC 선택

연결 풀링

커넥션 풀 구현:

from sqlalchemy.pool import QueuePool

engine = create_engine(
    DB_URL,
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600
)

오류 처리

재시도 로직 구현:

from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
import time

def execute_with_retry(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except pymysql.Error as e:
            if attempt == max_retries - 1:
                raise e
            time.sleep(2 ** attempt)

모니터링

로깅 설정:

import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def log_db_operation(operation):
    logger.info(f"Database operation: {operation}")
    
# 사용 예
log_db_operation("Connecting to database")

결론

이러한 설정으로 Tencent Cloud SCF에서 안정적인 RDB 연결과 쿼리 실행이 가능합니다. 연결 풀링을 통해 성능을 최적화하고, 적절한 오류 처리와 모니터링을 구현하는 것이 중요합니다.