Как правильно управлять FastAPI WebSockets с помощью AIOKafka Consumer
У меня есть простая конечная точка веб-сокета в FastAPI, которая получает данные с сервера Kafka с пакетом AIOKafka и отправляет их через веб-сокет.
@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer("kafka-producer", loop=loop,
bootstrap_servers=f"{settings.kafka.host}:{settings.kafka.port}")
await consumer.start()
try:
while True:
async for msg in consumer:
await manager.send_data(msg.value, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
finally:
await consumer.stop()
Мой вопрос здесь: правильно ли иметь одного потребителя Kafka на каждый веб-сокет. Всегда ли get_event_loop возвращает один и тот же текущий цикл? Это может вызвать проблемы?
Я пытался управлять одним потребителем на каждый веб-сокет и вызывать текущий цикл для каждого веб-сокета. Я ищу, существует ли лучший способ справиться с этим.