20250803

Deepseek 를 활용한 내부 AI 플랫폼

1. 전체 아키텍처 개요



2. 단계별 상세 구현

2.1 데이터 준비 및 전처리

필요한 라이브러리 설치

pip install pandas sqlalchemy transformers datasets

데이터 수집 예제 코드

import pandas as pd
from sqlalchemy import create_engine

# 데이터베이스 연결
engine = create_engine('postgresql://user:password@localhost:5432/company_db')

def extract_data():
    query = """
    SELECT id, title, content, department 
    FROM internal_documents
    WHERE created_at > '2023-01-01'
    """
    return pd.read_sql(query, engine)

# 데이터 전처리
def preprocess_data(df):
    # 중복 제거
    df = df.drop_duplicates(subset=['title'])
    
    # 텍스트 정규화
    df['processed_text'] = df.apply(
        lambda x: f"DOCUMENT TITLE: {x['title']}\nDEPARTMENT: {x['department']}\nCONTENT: {x['content'][:5000]}", 
        axis=1
    )
    return df

# 실행
raw_data = extract_data()
processed_data = preprocess_data(raw_data)


2.2 모델 학습

학습 환경

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer
)
from datasets import Dataset

# 데이터셋 변환
dataset = Dataset.from_pandas(processed_data[['id', 'processed_text']])

# 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/deepseek-base")
tokenizer.pad_token = tokenizer.eos_token

# 토크나이징 함수
def tokenize_function(examples):
    return tokenizer(examples["processed_text"], truncation=True, max_length=1024)

tokenized_dataset = dataset.map(tokenize_function, batched=True)

학습 실행

# 모델 로드
model = AutoModelForCausalLM.from_pretrained("deepseek-ai/deepseek-base")

# 학습 파라미터
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="steps",
    eval_steps=500,
    learning_rate=5e-5,
    per_device_train_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir='./logs',
    report_to="tensorboard"
)

# 트레이너 설정
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer
)

# 학습 시작
trainer.train()

# 모델 저장
model.save_pretrained("./fine_tuned_model")
tokenizer.save_pretrained("./fine_tuned_model")

2.3 API 서비스 구축

FastAPI 서버 구현

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import pipeline
import torch

app = FastAPI()

# 모델 로드
device = "cuda" if torch.cuda.is_available() else "cpu"
qa_pipeline = pipeline(
    "text-generation",
    model="./fine_tuned_model",
    device=device
)

class Query(BaseModel):
    text: str
    user_id: str

@app.post("/ask")
async def generate_answer(query: Query):
    try:
        response = qa_pipeline(
            f"QUESTION: {query.text}\nANSWER:",
            max_length=500,
            temperature=0.7,
            do_sample=True
        )
        return {"answer": response[0]['generated_text']}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Dockerfile

FROM python:3.9-slim

WORKDIR /app
COPY . .

RUN pip install torch transformers fastapi uvicorn

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

2.4 RAG(Retrieval-Augmented Generation) 구현

벡터 데이터베이스 설정

from sentence_transformers import SentenceTransformer

from qdrant_client import QdrantClient

from qdrant_client.models import Distance, VectorParams


# 임베딩 모델

encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')


# Qdrant 클라이언트

client = QdrantClient("localhost", port=6333)


# 컬렉션 생성

client.create_collection(

    collection_name="company_knowledge",

    vectors_config=VectorParams(

        size=384,  # 임베딩 차원

        distance=Distance.COSINE

    )

)


# 데이터 인덱싱

def index_documents(documents):

    for doc in documents:

        client.upsert(

            collection_name="company_knowledge",

            points=[{

                "id": doc['id'],

                "vector": encoder.encode(doc['processed_text']).tolist(),

                "payload": doc

            }]

        )


RAG 통합 서비스

def rag_answer(query):
    # 지식 검색
    query_embedding = encoder.encode(query).tolist()
    results = client.search(
        collection_name="company_knowledge",
        query_vector=query_embedding,
        limit=3
    )
    
    # 컨텍스트 조합
    context = "\n".join([hit.payload['processed_text'] for hit in results])
    
    # 프롬프트 엔지니어링
    prompt = f"""당신은 회사의 AI 어시스턴트입니다. 다음 정보를 바탕으로 질문에 답해주세요.

관련 문서:
{context}

질문: {query}

답변:"""
    
    # 생성 실행
    response = qa_pipeline(prompt, max_length=1000)
    return response[0]['generated_text']

3. 모니터링 및 유지보수

로그 시스템 구성

import logging
from datetime import datetime

logging.basicConfig(
    filename=f'logs/api_{datetime.now().strftime("%Y%m%d")}.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = datetime.now()
    response = await call_next(request)
    duration = (datetime.now() - start_time).total_seconds()
    
    logging.info(
        f"Method={request.method} "
        f"Path={request.url.path} "
        f"Status={response.status_code} "
        f"Duration={duration:.2f}s"
    )
    return response

지속적 학습 시스템

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ai-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'weekly_retraining',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=datetime(2023, 1, 1)
)

def retrain():
    # 새 데이터 수집
    new_data = extract_recent_data()
    
    # 재학습
    trainer = Trainer(
        model=AutoModelForCausalLM.from_pretrained("./fine_tuned_model"),
        train_dataset=new_data,
        args=TrainingArguments(...)
    )
    trainer.train()
    
    # 모델 평가
    if evaluate_model() > 0.85:  # 정확도 임계값
        deploy_new_model()

retrain_task = PythonOperator(
    task_id='retrain_model',
    python_callable=retrain,
    dag=dag
)

4. 보안 고려사항

API 보안 강화

from fastapi.security import APIKeyHeader
from fastapi import Depends, Security

api_key_header = APIKeyHeader(name="X-API-Key")

async def validate_api_key(api_key: str = Security(api_key_header)):
    if api_key != os.getenv("API_SECRET_KEY"):
        raise HTTPException(
            status_code=403,
            detail="Invalid API Key"
        )

@app.post("/secure/ask")
async def secure_ask(
    query: Query, 
    _: None = Depends(validate_api_key)
):
    return await generate_answer(query)

데이터 암호화

from cryptography.fernet import Fernet

# 키 생성 (최초 1회)
key = Fernet.generate_key()
cipher_suite = Fernet(key)

def encrypt_data(data: str) -> bytes:
    return cipher_suite.encrypt(data.encode())

def decrypt_data(encrypted_data: bytes) -> str:
    return cipher_suite.decrypt(encrypted_data).decode()

 

댓글 없음: