20210811

FastAPI + Redis pub/sub 실시간 처리

실시간 알림(주문 상태, 재고 변화) 뿌리는 엔드포인트가 있는데, 그동안 polling 기반이었음. SSE + redis pubsub 조합으로 전환. FastAPI 0.68, aioredis 2.0(이번에 2.0 GA 됨), Redis 6.2.

구조는 단순.

import asyncio
import aioredis
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse

app = FastAPI()
redis = aioredis.from_url("redis://localhost")

async def event_stream(user_id: str):
    pubsub = redis.pubsub()
    await pubsub.subscribe(f"user:{user_id}")
    try:
        async for msg in pubsub.listen():
            if msg["type"] == "message":
                yield {"event": "update", "data": msg["data"].decode()}
    finally:
        await pubsub.unsubscribe(f"user:{user_id}")
        await pubsub.close()

@app.get("/stream/{user_id}")
async def stream(user_id: str):
    return EventSourceResponse(event_stream(user_id))

발행 측은 worker.

await redis.publish(f"user:{user_id}", json.dumps(payload))

aioredis 2.0은 redis-py와 API 거의 일치해서 이관이 쉬움(기존 1.x 대비 많이 달라졌음). 근데 내부 구현이 전면 재작성돼서 초기 버그가 좀 있었다. 연결 종료 시 ConnectionError가 이벤트루프 종료 타이밍과 얽혀서 워닝 떠서 pubsub.close() 명시 호출로 해결.

운영 이슈:

  1. 클러스터 뒤에서 pubsub 채널은 노드 간 전파됨. 브로드캐스트 트래픽이 많아지면 클러스터 전체 부하. 한 채널당 약 500명 이상 구독 붙으면 per-channel 말고 glob으로 재설계 권장.
  2. keepalive 없으면 로드밸런서가 유휴 연결 끊음. SSE에 30초마다 comment(: keepalive\n\n) 보내기.
  3. worker 측에서 발행 실패를 조용히 삼키지 말 것. 메시지 유실되면 사용자가 "왜 알림 안 오지"를 느낀다.

결론 — 단방향 push는 SSE가 최적. WebSocket보다 프록시 친화적이고 구현 간단. 양방향 필요하면 ws 불가피.