텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계
시스템 아키텍처
이 시스템은 다음과 같은 구성요소로 이루어져 있습니다:
- API 수집 스케줄러 (SCF Timer Trigger)
- CKafka 메시지 큐
- API 수집 워커 (SCF)
- 데이터 저장 워커 (SCF)
- MySQL RDS
1. API 스케줄러
API 스케줄러는 정해진 시간에 API 작업을 분배하는 역할을 합니다. 아래는 스케줄러의 예시 코드입니다:
import json
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
from datetime import datetime
def load_api_configs():
# API 설정 로드
return [
{"api_id": "1", "url": "https://api1.example.com/data"},
{"api_id": "2", "url": "https://api2.example.com/data"},
# ... 1000개의 API 설정
]
def main_handler(event, context):
# 현재 배치에서 처리할 API 목록 선택
api_configs = load_api_configs()
batch_size = 50 # 배치당 처리할 API 수
# API 작업을 CKafka로 분배
for i in range(0, len(api_configs), batch_size):
batch = api_configs[i:i+batch_size]
send_to_kafka(batch)
return {"statusCode": 200, "message": "작업이 분배되었습니다"}
2. API 수집 워커
API 수집 워커는 CKafka에서 메시지를 수신하여 API 데이터를 수집하고, 수집된 데이터를 저장 큐로 전송합니다. 아래는 수집 워커의 예시 코드입니다:
import json
import requests
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
def main_handler(event, context):
# CKafka에서 메시지 수신
messages = event.get('Records', [])
for msg in messages:
try:
api_config = json.loads(msg['Ckafka']['value'])
response = fetch_api_data(api_config)
# 수집된 데이터를 저장 큐로 전송
send_to_storage_queue({
'api_id': api_config['api_id'],
'data': response,
'collected_at': datetime.now().isoformat()
})
except Exception as e:
handle_error(api_config, str(e))
def fetch_api_data(api_config):
response = requests.get(
api_config['url'],
headers={'Content-Type': 'application/json'},
timeout=5
)
return response.json()
3. 데이터 저장 워커
데이터 저장 워커는 수집된 데이터를 MySQL RDS에 저장합니다. 아래는 저장 워커의 예시 코드입니다:
import json
import pymysql
from datetime import datetime
def get_db_connection():
return pymysql.connect(
host=os.environ.get('DB_HOST'),
user=os.environ.get('DB_USER'),
password=os.environ.get('DB_PASSWORD'),
database=os.environ.get('DB_NAME'),
charset='utf8mb4'
)
def main_handler(event, context):
conn = get_db_connection()
cursor = conn.cursor()
try:
messages = event.get('Records', [])
for msg in messages:
data = json.loads(msg['Ckafka']['value'])
# 배치 인서트 구현
sql = """INSERT INTO collected_data
(api_id, data, collected_at)
VALUES (%s, %s, %s)"""
cursor.execute(sql, (
data['api_id'],
json.dumps(data['data']),
data['collected_at']
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
배포 설정
아래는 Serverless Framework를 사용한 배포 설정 예시입니다:
service: api-collector
provider:
name: tencent
region: ap-guangzhou
runtime: Python3.7
environment:
DB_HOST: ${env:DB_HOST}
DB_USER: ${env:DB_USER}
DB_PASSWORD: ${env:DB_PASSWORD}
DB_NAME: ${env:DB_NAME}
KAFKA_TOPIC: api-tasks
functions:
scheduler:
handler: scheduler.main_handler
events:
- timer:
name: api-scheduler
cron: "* * * * * * *"
memorySize: 128
timeout: 30
collector:
handler: collector.main_handler
events:
- ckafka:
name: api-collector
topic: api-tasks
maxMsgNum: 100
memorySize: 256
timeout: 60
storage:
handler: storage.main_handler
events:
- ckafka:
name: data-storage
topic: collected-data
maxMsgNum: 1000
memorySize: 512
timeout: 90
시스템 특징
- 분산 처리: 스케줄러가 API 작업을 작은 배치로 분할하고, 여러 수집기 함수가 병렬로 API 호출을 수행합니다. 저장 작업도 분산 처리되어 데이터베이스 부하를 분산합니다.
- 오류 처리: 각 단계별 재시도 메커니즘이 있으며, 실패한 작업은 별도 큐로 이동하여 후처리됩니다. 모니터링 및 알림 시스템과 연동됩니다.
- 성능 최적화: 배치 처리로 데이터베이스 연결을 최소화하고, 메모리 사용량에 따른 함수 리소스 할당이 이루어집니다. 타임아웃 설정으로 장애 전파를 방지합니다.
- 확장성: API 수가 증가해도 자동으로 확장되며, 부하에 따라 함수 인스턴스가 자동 스케일링됩니다. 모듈화된 구조로 유지보수가 용이합니다.
모니터링과 로깅
아래는 모니터링과 로깅을 위한 예시 코드입니다:
import logging
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 모니터링 지표
def record_metrics(api_id, response_time, status):
logger.info(f"API 호출 통계 - ID: {api_id}, 응답시간: {response_time}ms, 상태: {status}")
결론
이 아키텍처는 1000개의 API를 안정적으로 수집할 수 있으며, 필요에 따라 더 많은 API도 처리할 수 있도록 확장 가능합니다. 시스템은 우수한 오류 처리, 확장성 및 유지보수성을 갖추고 있습니다.