실시간 알림(주문 상태, 재고 변화) 뿌리는 엔드포인트가 있는데, 그동안 polling 기반이었음. SSE + redis pubsub 조합으로 전환. FastAPI 0.68, aioredis 2.0(이번에 2.0 GA 됨), Redis 6.2.
구조는 단순.
import asyncio
import aioredis
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
redis = aioredis.from_url("redis://localhost")
async def event_stream(user_id: str):
pubsub = redis.pubsub()
await pubsub.subscribe(f"user:{user_id}")
try:
async for msg in pubsub.listen():
if msg["type"] == "message":
yield {"event": "update", "data": msg["data"].decode()}
finally:
await pubsub.unsubscribe(f"user:{user_id}")
await pubsub.close()
@app.get("/stream/{user_id}")
async def stream(user_id: str):
return EventSourceResponse(event_stream(user_id))
발행 측은 worker.
await redis.publish(f"user:{user_id}", json.dumps(payload))
aioredis 2.0은 redis-py와 API 거의 일치해서 이관이 쉬움(기존 1.x 대비 많이 달라졌음). 근데 내부 구현이 전면 재작성돼서 초기 버그가 좀 있었다. 연결 종료 시 ConnectionError가 이벤트루프 종료 타이밍과 얽혀서 워닝 떠서 pubsub.close() 명시 호출로 해결.
운영 이슈:
- 클러스터 뒤에서 pubsub 채널은 노드 간 전파됨. 브로드캐스트 트래픽이 많아지면 클러스터 전체 부하. 한 채널당 약 500명 이상 구독 붙으면 per-channel 말고 glob으로 재설계 권장.
- keepalive 없으면 로드밸런서가 유휴 연결 끊음. SSE에 30초마다 comment(
: keepalive\n\n) 보내기. - worker 측에서 발행 실패를 조용히 삼키지 말 것. 메시지 유실되면 사용자가 "왜 알림 안 오지"를 느낀다.
결론 — 단방향 push는 SSE가 최적. WebSocket보다 프록시 친화적이고 구현 간단. 양방향 필요하면 ws 불가피.
댓글 없음:
댓글 쓰기