python fastapi, проблема голодания айокафки

Проблема: потребитель aiokafka недополучает конечную точку fastapi, из-за чего наши проверки работоспособности kubernetes дают сбой, а время ожидания любого другого сервиса, вызывающего открытые конечные точки, истекает.

Подробности :

Существует потребитель kafka, который запускается во время запуска fastapi и продолжает слушать конкретную тему.

А еще есть конечная точка fastapi, которая обслуживает запрос.

Когда в разделе темы kafka много сообщений, потребитель kafka голодает в цикле событий, а запросы, обслуживаемые конечными точками fastapi, истекают.

Как мы можем решить эту проблему?

      #all the imports



consumer = None
consumer_task = None

log = None

def get_application():
    #initialize fastapi app and with different routes and do some stuff
    return app

app = get_application()

@app.on_event("startup")
async def startup_event():
    #initialize consumer
    await initialize()
    # start consuming
    await consume()

@app.on_event("shutdown")
async def shutdown_event():
    #close consumer

async def initialize():
    #initilize
    # get cluster layout and join group
    await consumer.start()
    await consumer.seek_to_committed()

async def consume():
    global consumer_task
    loop = asyncio.get_event_loop()
    consumer_task = loop.create_task(send_consumer_message(consumer))


async def send_consumer_message(consumer):
    try:
        # consume messages
        async for msg in consumer:
            #do message processing
    except Exception as e:
        log.info(f"message consuming failed withe error: {repr(e)}")
    finally:
        log.warning("stopping consumer")
        await consumer.stop()

0 ответов

Другие вопросы по тегам