1. 전체 아키텍처 개요
데이터 수집 예제 코드
import pandas as pd
from sqlalchemy import create_engine
# 데이터베이스 연결
engine = create_engine('postgresql://user:password@localhost:5432/company_db')
def extract_data():
query = """
SELECT id, title, content, department
FROM internal_documents
WHERE created_at > '2023-01-01'
"""
return pd.read_sql(query, engine)
# 데이터 전처리
def preprocess_data(df):
# 중복 제거
df = df.drop_duplicates(subset=['title'])
# 텍스트 정규화
df['processed_text'] = df.apply(
lambda x: f"DOCUMENT TITLE: {x['title']}\nDEPARTMENT: {x['department']}\nCONTENT: {x['content'][:5000]}",
axis=1
)
return df
# 실행
raw_data = extract_data()
processed_data = preprocess_data(raw_data)
2.2 모델 학습
학습 환경
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
TrainingArguments,
Trainer
)
from datasets import Dataset
# 데이터셋 변환
dataset = Dataset.from_pandas(processed_data[['id', 'processed_text']])
# 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/deepseek-base")
tokenizer.pad_token = tokenizer.eos_token
# 토크나이징 함수
def tokenize_function(examples):
return tokenizer(examples["processed_text"], truncation=True, max_length=1024)
tokenized_dataset = dataset.map(tokenize_function, batched=True)
학습 실행
# 모델 로드
model = AutoModelForCausalLM.from_pretrained("deepseek-ai/deepseek-base")
# 학습 파라미터
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="steps",
eval_steps=500,
learning_rate=5e-5,
per_device_train_batch_size=8,
num_train_epochs=3,
weight_decay=0.01,
logging_dir='./logs',
report_to="tensorboard"
)
# 트레이너 설정
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer
)
# 학습 시작
trainer.train()
# 모델 저장
model.save_pretrained("./fine_tuned_model")
tokenizer.save_pretrained("./fine_tuned_model")
2.3 API 서비스 구축
FastAPI 서버 구현
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import pipeline
import torch
app = FastAPI()
# 모델 로드
device = "cuda" if torch.cuda.is_available() else "cpu"
qa_pipeline = pipeline(
"text-generation",
model="./fine_tuned_model",
device=device
)
class Query(BaseModel):
text: str
user_id: str
@app.post("/ask")
async def generate_answer(query: Query):
try:
response = qa_pipeline(
f"QUESTION: {query.text}\nANSWER:",
max_length=500,
temperature=0.7,
do_sample=True
)
return {"answer": response[0]['generated_text']}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY . .
RUN pip install torch transformers fastapi uvicorn
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
2.4 RAG(Retrieval-Augmented Generation) 구현
벡터 데이터베이스 설정
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
# 임베딩 모델
encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# Qdrant 클라이언트
client = QdrantClient("localhost", port=6333)
# 컬렉션 생성
client.create_collection(
collection_name="company_knowledge",
vectors_config=VectorParams(
size=384, # 임베딩 차원
distance=Distance.COSINE
)
)
# 데이터 인덱싱
def index_documents(documents):
for doc in documents:
client.upsert(
collection_name="company_knowledge",
points=[{
"id": doc['id'],
"vector": encoder.encode(doc['processed_text']).tolist(),
"payload": doc
}]
)
RAG 통합 서비스
def rag_answer(query):
# 지식 검색
query_embedding = encoder.encode(query).tolist()
results = client.search(
collection_name="company_knowledge",
query_vector=query_embedding,
limit=3
)
# 컨텍스트 조합
context = "\n".join([hit.payload['processed_text'] for hit in results])
# 프롬프트 엔지니어링
prompt = f"""당신은 회사의 AI 어시스턴트입니다. 다음 정보를 바탕으로 질문에 답해주세요.
관련 문서:
{context}
질문: {query}
답변:"""
# 생성 실행
response = qa_pipeline(prompt, max_length=1000)
return response[0]['generated_text']
3. 모니터링 및 유지보수
로그 시스템 구성
import logging
from datetime import datetime
logging.basicConfig(
filename=f'logs/api_{datetime.now().strftime("%Y%m%d")}.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = datetime.now()
response = await call_next(request)
duration = (datetime.now() - start_time).total_seconds()
logging.info(
f"Method={request.method} "
f"Path={request.url.path} "
f"Status={response.status_code} "
f"Duration={duration:.2f}s"
)
return response
지속적 학습 시스템
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ai-team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'weekly_retraining',
default_args=default_args,
schedule_interval='@weekly',
start_date=datetime(2023, 1, 1)
)
def retrain():
# 새 데이터 수집
new_data = extract_recent_data()
# 재학습
trainer = Trainer(
model=AutoModelForCausalLM.from_pretrained("./fine_tuned_model"),
train_dataset=new_data,
args=TrainingArguments(...)
)
trainer.train()
# 모델 평가
if evaluate_model() > 0.85: # 정확도 임계값
deploy_new_model()
retrain_task = PythonOperator(
task_id='retrain_model',
python_callable=retrain,
dag=dag
)
4. 보안 고려사항
API 보안 강화
from fastapi.security import APIKeyHeader
from fastapi import Depends, Security
api_key_header = APIKeyHeader(name="X-API-Key")
async def validate_api_key(api_key: str = Security(api_key_header)):
if api_key != os.getenv("API_SECRET_KEY"):
raise HTTPException(
status_code=403,
detail="Invalid API Key"
)
@app.post("/secure/ask")
async def secure_ask(
query: Query,
_: None = Depends(validate_api_key)
):
return await generate_answer(query)
데이터 암호화
from cryptography.fernet import Fernet
# 키 생성 (최초 1회)
key = Fernet.generate_key()
cipher_suite = Fernet(key)
def encrypt_data(data: str) -> bytes:
return cipher_suite.encrypt(data.encode())
def decrypt_data(encrypted_data: bytes) -> str:
return cipher_suite.decrypt(encrypted_data).decode()
댓글 없음:
댓글 쓰기