Как правильно управлять FastAPI WebSockets с помощью AIOKafka Consumer

У меня есть простая конечная точка веб-сокета в FastAPI, которая получает данные с сервера Kafka с пакетом AIOKafka и отправляет их через веб-сокет.

      @router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)

    loop = asyncio.get_event_loop()
    consumer = AIOKafkaConsumer("kafka-producer", loop=loop,
                                bootstrap_servers=f"{settings.kafka.host}:{settings.kafka.port}")

    await consumer.start()

    try:
        while True:
            async for msg in consumer:
                await manager.send_data(msg.value, websocket)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
    finally:
        await consumer.stop()

Мой вопрос здесь: правильно ли иметь одного потребителя Kafka на каждый веб-сокет. Всегда ли get_event_loop возвращает один и тот же текущий цикл? Это может вызвать проблемы?

Я пытался управлять одним потребителем на каждый веб-сокет и вызывать текущий цикл для каждого веб-сокета. Я ищу, существует ли лучший способ справиться с этим.

0 ответов

Другие вопросы по тегам