Потребитель производителя asyncio звонит из не асинхронного основного потока
Я пытаюсь запустить asyncio в теме. Причина, по которой я это делаю, заключается в том, что я хочу общаться с asyncio из основного потока, не являющегося асинхронным. Так вот мой код
import asyncio
import threading
async def consume(input_q, output_q):
while True:
item = await input_q.get()
print(item)
output_q.put("hello back")
async def run(input_q, output_q):
asyncio.ensure_future(consume(input_q, output_q))
while True:
await asyncio.sleep(1)
print("message")
def run_in_thread(loop,input_q,output_q):
asyncio.set_event_loop(loop)
loop.run_until_complete(run(input_q, output_q))
loop = asyncio.new_event_loop()
input_q = asyncio.Queue(loop=loop)
output_q = asyncio.Queue(loop=loop)
thread = threading.Thread(target=run_in_thread ,args=(loop, input_q, output_q))
thread.start()
time.sleep(5)
asyncio.run_coroutine_threadsafe(input_q.put("hello input"), loop)
time.sleep(2)
item = asyncio.run_coroutine_threadsafe(output_q.get_nowait(), loop).result()
assert "hello back" in item
print(item)
loop.call_soon_threadsafe(loop.stop)
thread.join()
Я могу помещать сообщения в очередь ввода из основного потока. Булочка, к сожалению, не умеет читать из output_q. Есть ли решение для этого? Получение ошибки в этой строке:
item = asyncio.run_coroutine_threadsafe(output_q.get_nowait(), loop).result()
--> raises QueueEmpty
Может быть, используя многопроцессорность или что-то еще. То, что я хотел бы сделать, это запустить вышеуказанные функции и общаться с ними из другого процесса или потока, который не использует асинхронный. Большое спасибо за любой намек!
PS: я использую python 3.5