레이블이 python인 게시물을 표시합니다. 모든 게시물 표시
레이블이 python인 게시물을 표시합니다. 모든 게시물 표시

20250117

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

 

텐센트 클라우드 서버리스 API 데이터 수집 시스템 설계

시스템 아키텍처

이 시스템은 다음과 같은 구성요소로 이루어져 있습니다:

  • API 수집 스케줄러 (SCF Timer Trigger)
  • CKafka 메시지 큐
  • API 수집 워커 (SCF)
  • 데이터 저장 워커 (SCF)
  • MySQL RDS

1. API 스케줄러

API 스케줄러는 정해진 시간에 API 작업을 분배하는 역할을 합니다. 아래는 스케줄러의 예시 코드입니다:

import json
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client
from datetime import datetime

def load_api_configs():
    # API 설정 로드
    return [
        {"api_id": "1", "url": "https://api1.example.com/data"},
        {"api_id": "2", "url": "https://api2.example.com/data"},
        # ... 1000개의 API 설정
    ]

def main_handler(event, context):
    # 현재 배치에서 처리할 API 목록 선택
    api_configs = load_api_configs()
    batch_size = 50  # 배치당 처리할 API 수
    
    # API 작업을 CKafka로 분배
    for i in range(0, len(api_configs), batch_size):
        batch = api_configs[i:i+batch_size]
        send_to_kafka(batch)
    
    return {"statusCode": 200, "message": "작업이 분배되었습니다"}

2. API 수집 워커

API 수집 워커는 CKafka에서 메시지를 수신하여 API 데이터를 수집하고, 수집된 데이터를 저장 큐로 전송합니다. 아래는 수집 워커의 예시 코드입니다:

import json
import requests
import os
from tencentcloud.common import credential
from tencentcloud.ckafka.v20190819 import ckafka_client

def main_handler(event, context):
    # CKafka에서 메시지 수신
    messages = event.get('Records', [])
    
    for msg in messages:
        try:
            api_config = json.loads(msg['Ckafka']['value'])
            response = fetch_api_data(api_config)
            
            # 수집된 데이터를 저장 큐로 전송
            send_to_storage_queue({
                'api_id': api_config['api_id'],
                'data': response,
                'collected_at': datetime.now().isoformat()
            })
            
        except Exception as e:
            handle_error(api_config, str(e))

def fetch_api_data(api_config):
    response = requests.get(
        api_config['url'],
        headers={'Content-Type': 'application/json'},
        timeout=5
    )
    return response.json()

3. 데이터 저장 워커

데이터 저장 워커는 수집된 데이터를 MySQL RDS에 저장합니다. 아래는 저장 워커의 예시 코드입니다:

import json
import pymysql
from datetime import datetime

def get_db_connection():
    return pymysql.connect(
        host=os.environ.get('DB_HOST'),
        user=os.environ.get('DB_USER'),
        password=os.environ.get('DB_PASSWORD'),
        database=os.environ.get('DB_NAME'),
        charset='utf8mb4'
    )

def main_handler(event, context):
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        messages = event.get('Records', [])
        
        for msg in messages:
            data = json.loads(msg['Ckafka']['value'])
            
            # 배치 인서트 구현
            sql = """INSERT INTO collected_data 
                    (api_id, data, collected_at) 
                    VALUES (%s, %s, %s)"""
            
            cursor.execute(sql, (
                data['api_id'],
                json.dumps(data['data']),
                data['collected_at']
            ))
        
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

배포 설정

아래는 Serverless Framework를 사용한 배포 설정 예시입니다:

service: api-collector

provider:
  name: tencent
  region: ap-guangzhou
  runtime: Python3.7
  environment:
    DB_HOST: ${env:DB_HOST}
    DB_USER: ${env:DB_USER}
    DB_PASSWORD: ${env:DB_PASSWORD}
    DB_NAME: ${env:DB_NAME}
    KAFKA_TOPIC: api-tasks

functions:
  scheduler:
    handler: scheduler.main_handler
    events:
      - timer:
          name: api-scheduler
          cron: "* * * * * * *"
    memorySize: 128
    timeout: 30
    
  collector:
    handler: collector.main_handler
    events:
      - ckafka:
          name: api-collector
          topic: api-tasks
          maxMsgNum: 100
    memorySize: 256
    timeout: 60
    
  storage:
    handler: storage.main_handler
    events:
      - ckafka:
          name: data-storage
          topic: collected-data
          maxMsgNum: 1000
    memorySize: 512
    timeout: 90

시스템 특징

  • 분산 처리: 스케줄러가 API 작업을 작은 배치로 분할하고, 여러 수집기 함수가 병렬로 API 호출을 수행합니다. 저장 작업도 분산 처리되어 데이터베이스 부하를 분산합니다.
  • 오류 처리: 각 단계별 재시도 메커니즘이 있으며, 실패한 작업은 별도 큐로 이동하여 후처리됩니다. 모니터링 및 알림 시스템과 연동됩니다.
  • 성능 최적화: 배치 처리로 데이터베이스 연결을 최소화하고, 메모리 사용량에 따른 함수 리소스 할당이 이루어집니다. 타임아웃 설정으로 장애 전파를 방지합니다.
  • 확장성: API 수가 증가해도 자동으로 확장되며, 부하에 따라 함수 인스턴스가 자동 스케일링됩니다. 모듈화된 구조로 유지보수가 용이합니다.

모니터링과 로깅

아래는 모니터링과 로깅을 위한 예시 코드입니다:

import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# 모니터링 지표
def record_metrics(api_id, response_time, status):
    logger.info(f"API 호출 통계 - ID: {api_id}, 응답시간: {response_time}ms, 상태: {status}")

이 아키텍처는 1000개의 API를 안정적으로 수집할 수 있으며, 필요에 따라 더 많은 API도 처리할 수 있도록 확장 가능합니다. 시스템은 우수한 오류 처리, 확장성 및 유지보수성을 갖추고 있습니다.

Tencent Cloud SCF에서 Python RDB 연결

 

Tencent Cloud SCF에서 Python RDB 연결 가이드

기본 설정

필요한 패키지를 먼저 설치합니다:

pip install pymysql
pip install sqlalchemy

데이터베이스 연결 코드

기본 연결 설정:

import os
import pymysql
from sqlalchemy import create_engine

def get_db_connection():
    host = os.environ.get('DB_HOST')
    port = int(os.environ.get('DB_PORT', 3306))
    user = os.environ.get('DB_USER')
    password = os.environ.get('DB_PASSWORD')
    database = os.environ.get('DB_NAME')
    
    connection = pymysql.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        database=database,
        charset='utf8mb4'
    )
    return connection

SQLAlchemy 사용 예제:

def get_db_engine():
    DB_URL = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
    engine = create_engine(DB_URL)
    return engine

def main_handler(event, context):
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        # 쿼리 실행
        cursor.execute("SELECT * FROM your_table")
        result = cursor.fetchall()
        
        # 변경사항 저장
        conn.commit()
        
        return {"statusCode": 200, "body": result}
    except Exception as e:
        return {"statusCode": 500, "body": str(e)}
    finally:
        cursor.close()
        conn.close()

보안 설정

VPC 설정

  • SCF 콘솔에서 VPC 설정
  • 보안그룹 설정
  • 데이터베이스와 동일한 VPC 선택

연결 풀링

커넥션 풀 구현:

from sqlalchemy.pool import QueuePool

engine = create_engine(
    DB_URL,
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600
)

오류 처리

재시도 로직 구현:

from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
import time

def execute_with_retry(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except pymysql.Error as e:
            if attempt == max_retries - 1:
                raise e
            time.sleep(2 ** attempt)

모니터링

로깅 설정:

import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def log_db_operation(operation):
    logger.info(f"Database operation: {operation}")
    
# 사용 예
log_db_operation("Connecting to database")


이러한 설정으로 Tencent Cloud SCF에서 안정적인 RDB 연결과 쿼리 실행이 가능합니다. 연결 풀링을 통해 성능을 최적화하고, 적절한 오류 처리와 모니터링을 구현하는 것이 중요합니다.

20250115

Streamlit : 이미지 분류 및 데이터 시각화 앱 만들기 (Mac 환경)

1. 개발 환경 설정

Python 및 Streamlit 설치

  1. Python 설치: Python 3.8 이상이 필요합니다. Mac에서 Python이 설치되어 있지 않다면, Python 공식 웹사이트에서 다운로드
  2. 가상 환경 생성:
    python3 -m venv streamlit_env
    source streamlit_env/bin/activate
  3. 필요한 라이브러리 설치:
    pip install streamlit pandas numpy matplotlib tensorflow_hub pillow

Streamlit 설치 확인

Streamlit이 올바르게 설치되었는지 확인하려면 다음 명령을 실행

streamlit hello

브라우저에 Streamlit 데모 앱이 열리면 성공적으로 설치된 것입니다.

2. Streamlit 앱 코드 작성

아래 코드는 이미지 분류와 데이터 시각화를 포함한 Streamlit 애플리케이션입니다.

import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow_hub as hub
from PIL import Image

# 앱 제목 및 설명
st.title("이미지 분류 및 데이터 시각화 앱")
st.write("이 앱은 업로드된 이미지를 분류하고 데이터를 시각화합니다.")

# 이미지 업로드 및 분류
uploaded_file = st.file_uploader("이미지를 업로드하세요", type=["jpg", "png", "jpeg"])
if uploaded_file is not None:
    image = Image.open(uploaded_file)
    st.image(image, caption="업로드된 이미지", use_column_width=True)
    st.write("이미지를 분류 중입니다...")

    # TensorFlow Hub에서 사전 학습된 모델 로드
    model = hub.load("https://tfhub.dev/google/imagenet/mobilenet_v2_100_224/classification/5")
    image = image.resize((224, 224))
    image_array = np.array(image) / 255.0  # 정규화
    image_array = np.expand_dims(image_array, axis=0)

    predictions = model(image_array)
    st.write("분류 결과:", predictions.numpy())

# 데이터 시각화
st.header("데이터 시각화")
data = pd.DataFrame(
    np.random.randn(100, 3),
    columns=['특성 A', '특성 B', '특성 C']
)
st.line_chart(data)

# 사이드바 예제
st.sidebar.header("사이드바 설정")
range_slider = st.sidebar.slider("값 범위 선택", 0, 100, (25, 75))
st.sidebar.write(f"선택된 범위: {range_slider}")

3. 코드 저장

위 코드를 image_classification_app.py라는 이름으로 저장하세요:

nano image_classification_app.py

4. 앱 실행

  1. 디렉토리 이동:
    cd /path/to/your/directory
  2. Streamlit 앱 실행:
    streamlit run image_classification_app.py
    브라우저에서 http://localhost:8501로 이동하여 앱을 확인합니다.

5. 주요 기능 설명

  • A. 이미지 업로드 및 분류: 사용자가 이미지를 업로드하면 TensorFlow Hub의 사전 학습된 MobileNet V2 모델을 사용하여 이미지를 분류합니다.
  • B. 데이터 시각화: 무작위로 생성된 데이터를 Pandas DataFrame으로 변환하고 이를 라인 차트로 시각화합니다.
  • C. 사이드바: 사이드바에 슬라이더를 추가하여 사용자 입력을 받을 수 있습니다.

6. 실행 결과

앱 실행 후 다음과 같은 기능을 테스트할 수 있습니다:

  • 이미지를 업로드하고 분류 결과를 확인합니다.
  • 데이터가 라인 차트로 표시되는 것을 봅니다.
  • 사이드바 슬라이더를 조작하여 선택된 범위를 변경합니다.

7. 추가 팁

  • TensorFlow Hub 모델 외에도 다양한 사전 학습 모델을 사용할 수 있습니다.
  • @st.cache 데코레이터를 활용해 데이터 로딩 속도를 최적화할 수 있습니다.
  • Streamlit Cloud를 사용해 앱을 쉽게 배포할 수 있습니다.


20250104

Python Backend와 ASGI 서버 구축: Uvicorn, Gunicorn, FastAPI, Vibora

 Python으로 백엔드 개발을 시작하거나 ASGI(Asynchronous Server Gateway Interface) 환경을 구축하려는 개발자를 위해 WSGI와 ASGI의 차이ASGI 서버 구성 방법, 그리고 FastAPI를 활용한 API 개발을 중심으로 정리했습니다. 

1. Backend 구성: WSGI와 ASGI

1.1 웹 서버(Web Server)

웹 서버는 크게 두 가지 역할을 합니다:
  1. 정적 요청 처리: HTML, CSS, 이미지 등 정적 파일을 클라이언트에 제공.
  2. 동적 요청 전달: Reverse Proxy를 통해 어플리케이션 서버로 동적 요청 전달(로드 밸런싱 포함).

주요 웹 서버

  • Nginx: 고성능 HTTP 및 Reverse Proxy 서버.
  • Apache: 가장 널리 사용되는 HTTP 서버.
  • Caddy: Go로 개발된 간단한 자동 HTTPS 지원 웹 서버.
웹 서버는 어플리케이션 서버와 TCP 또는 Unix Socket으로 연결하여 성능을 최적화할 수 있습니다.

1.2 WSGI와 ASGI

WSGI(Web Server Gateway Interface)

  • Python의 동기 웹 애플리케이션과 웹 서버 간의 표준 인터페이스.
  • Flask, Django 등 기존의 동기 기반 프레임워크에서 사용.

ASGI(Asynchronous Server Gateway Interface)

  • WSGI의 계승자로, 비동기 Python 웹 애플리케이션과 웹 서버 간의 표준 인터페이스.
  • WebSocket, HTTP/2, 비동기 작업 등을 지원.
  • 동기 및 비동기 앱 모두를 처리 가능.

WSGI와 ASGI의 차이

특징WSGIASGI
동시성 모델동기비동기
WebSocket 지원XO
HTTP/2 지원XO
배경 작업 및 장시간 연결제한적O

1.3 Gunicorn과 Uvicorn의 역할

아키텍처에서 Gunicorn과 Uvicorn은 다음과 같은 역할을 수행합니다:
  1. Gunicorn (WSGI/ASGI 프로세스 매니저):
    • Master 프로세스를 실행하고 여러 Worker 프로세스를 관리.
    • Worker로 Uvicorn을 실행하여 멀티 워커 환경 구성 가능.
  2. Uvicorn (ASGI 서버):
    • 단일 워커로 작동하며 비동기 처리를 담당.
    • Gunicorn과 함께 사용하면 안정적인 멀티 워커 환경을 제공.

2. FastAPI: 현대적인 고성능 API 프레임워크

2.1 FastAPI란?

FastAPI는 Python 3.6+를 기반으로 한 현대적이고 고성능의 API 프레임워크입니다. Starlette(ASGI 툴킷)과 Pydantic(데이터 검증 및 설정 관리)을 기반으로 하며, Node.js나 Go에 필적하는 성능을 제공합니다.

주요 특징

  • 자동화된 문서화: Swagger UI 및 ReDoc 지원.
  • Python 타입 힌트 기반: 코드 가독성과 유지보수성 향상.
  • 고성능: Uvicorn + Starlette 조합으로 높은 처리량 제공.

2.2 FastAPI 개발환경 구성

1) 가상환경 생성 및 패키지 설치

bash
conda create -n web python=3.9 conda install poetry poetry new hello-api poetry add fastapi 'uvicorn[standard]'

2) 예제 API 작성

./hello_api/main.py 파일에 아래 코드를 작성합니다:
python
from typing import Optional from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"Hello": "World"} @app.get("/items/{item_id}") def read_item(item_id: int, q: Optional[str] = None): return {"item_id": item_id, "q": q}

3) Uvicorn 실행

bash
uvicorn hello_api.main:app --reload
웹 브라우저에서 http://127.0.0.1:8000/docs에 접속하면 Swagger UI를 통해 API 문서를 확인할 수 있습니다.

2.3 Docker로 FastAPI 배포하기

Dockerfile 작성

text
FROM python:3.9-slim as requirements-stage WORKDIR /tmp RUN pip install poetry COPY ./pyproject.toml ./poetry.lock* /tmp/ RUN poetry export -f requirements.txt --output requirements.txt --without-hashes FROM python:3.9-slim WORKDIR /code COPY --from=requirements-stage /tmp/requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY ./app /code/app CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]

3. Uvicorn과 uvloop

3.1 uvloop이란?

uvloop은 Python의 기본 asyncio 이벤트 루프를 대체하는 라이브러리로, libuv 기반으로 구현되었습니다. Cython으로 작성되어 매우 높은 성능을 제공합니다.

uvloop 설정 방법

python
import asyncio import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

4. Vibora: Flask 스타일의 고속 비동기 프레임워크

Vibora는 Flask와 유사한 API를 제공하며, 자체 HTTP 서버를 내장한 고속 비동기 프레임워크입니다.

주요 특징:

  • uvloop와 C 확장을 활용하여 높은 성능 제공.
  • Gunicorn이나 Nginx 없이도 멀티 코어 활용 가능.
  • WebSocket 및 비동기 작업 지원.

한계:

  • 현재 알파(Alpha) 단계로 안정성이 부족할 수 있음.
  • 공식 문서와 커뮤니티 지원이 제한적.

5. ASGI 서버 구성 전략

단일 워커 vs 멀티 워커

전략설명
단일 워커컨테이너 오케스트레이터(Kubernetes 등) 환경에서 적합하며 간단한 테스트 환경에 유용함.
멀티 워커Gunicorn + Uvicorn 조합으로 안정성과 확장성을 제공하며 고부하 환경에서 적합함.

검증된 내용 요약

  1. ASGI 표준
    • 공식 문서에서 ASGI는 WSGI의 계승자로 정의되며 WebSocket 및 HTTP/2 같은 현대적인 요구사항을 충족합니다.
  2. Uvicorn + Gunicorn
    • Uvicorn은 단일 워커로 빠르고 가볍게 동작하며, Gunicorn과 결합하면 멀티 프로세스 환경에서도 안정적으로 작동합니다.
  3. uvloop
    • MagicStack/uvloop GitHub 리포지토리에서 성능 비교 결과 asyncio 기본 루프보다 훨씬 빠르다고 명시되어 있습니다.
  4. FastAPI
    • FastAPI 공식 문서에서 Node.js 및 Go에 필적하는 성능을 제공한다고 언급되었으며, 이는 Starlette와 Pydantic 덕분입니다.
  5. Vibora
    • Vibora는 높은 성능을 목표로 하지만 알파 단계이며 실험적인 프로젝트에 적합합니다(공식 GitHub README 참조).

6. 정리 및 추천

Python 백엔드 개발에서 ASGI는 현대적인 비동기 요구사항(WebSocket, HTTP/2 등)을 충족하기 위한 표준입니다.
  1. Uvicorn + Gunicorn 조합
    • 안정적이고 확장 가능한 멀티 워커 환경 제공.
    • Kubernetes 같은 컨테이너 오케스트레이터에서도 잘 작동.
  2. FastAPI
    • 빠르고 현대적인 API 개발에 적합.
    • 자동화된 문서화와 Python 타입 힌트를 활용한 높은 생산성 제공.
  3. uvloop
    • asyncio 이벤트 루프를 대체하여 성능 향상.
  4. Vibora
    • 실험적인 프로젝트나 초고속 처리가 필요한 경우 고려 가능.
Python 백엔드 아키텍처는 다양한 도구와 프레임워크를 통해 유연하게 구성할 수 있습니다.