Python3.7 веб-сервер asyncio start (FastAPI) и потребитель aio_pika
В моем проекте я пытаюсь запустить REST API (созданный с помощью FastAPI и запускаемый с Hypercorn), дополнительно я хочу, чтобы при запуске также запускался RabbitMQ Consumer (с aio_pika):
Aio Pika предлагает надежное соединение, которое автоматически восстанавливает соединение в случае сбоя. Если я запустил код ниже сhypercorn app:app
интерфейс потребителя и остального запускается правильно, но переподключение от aio_pika больше не работает. Как я могу заархивировать стабильный рабочий RabbitMQ Consumer и RestAPI в двух разных процессах (или потоках?). Моя версия python - 3.7, обратите внимание, что я на самом деле разработчик Java и Go на случай, если мой подход не похож на Python:-)
@app.on_event("startup")
def startup():
loop = asyncio.new_event_loop()
asyncio.ensure_future(main(loop))
@app.get("/")
def read_root():
return {"Hello": "World"}
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
async with connection:
queue_name = "test_queue"
# Creating channel
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(
queue_name,
auto_delete=True
) # type: aio_pika.Queue
async with queue.iterator() as queue_iter:
# Cancel consuming after __aexit__
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
1 ответ
С помощью @pgjones мне удалось изменить начало потребления на:
@app.on_event("startup")
def startup():
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
И начать job
с участием asyncio.ensure_future
и передать текущий цикл событий в качестве аргумента, что решило проблему.
Было бы интересно, если бы у кого-то был другой / лучший подход Спасибо!