python fastapi, проблема голодания айокафки
Проблема: потребитель aiokafka недополучает конечную точку fastapi, из-за чего наши проверки работоспособности kubernetes дают сбой, а время ожидания любого другого сервиса, вызывающего открытые конечные точки, истекает.
Подробности :
Существует потребитель kafka, который запускается во время запуска fastapi и продолжает слушать конкретную тему.
А еще есть конечная точка fastapi, которая обслуживает запрос.
Когда в разделе темы kafka много сообщений, потребитель kafka голодает в цикле событий, а запросы, обслуживаемые конечными точками fastapi, истекают.
Как мы можем решить эту проблему?
#all the imports
consumer = None
consumer_task = None
log = None
def get_application():
#initialize fastapi app and with different routes and do some stuff
return app
app = get_application()
@app.on_event("startup")
async def startup_event():
#initialize consumer
await initialize()
# start consuming
await consume()
@app.on_event("shutdown")
async def shutdown_event():
#close consumer
async def initialize():
#initilize
# get cluster layout and join group
await consumer.start()
await consumer.seek_to_committed()
async def consume():
global consumer_task
loop = asyncio.get_event_loop()
consumer_task = loop.create_task(send_consumer_message(consumer))
async def send_consumer_message(consumer):
try:
# consume messages
async for msg in consumer:
#do message processing
except Exception as e:
log.info(f"message consuming failed withe error: {repr(e)}")
finally:
log.warning("stopping consumer")
await consumer.stop()