Python asyncio, можно ожидать / выдать весь myFunction()

Я написал библиотеку объектов, многие из которых делают вызовы HTTP / IO. Я рассчитывал перейти на asyncio из-за накладных расходов на монтирование, но я не хочу переписывать основной код.

Я надеялся обернуть asyncio вокруг моего кода, чтобы выполнять функции асинхронно, не заменяя весь мой глубокий / низкоуровневый код на await / yield.

Я начал с попытки следующего:

async def my_function1(some_object, some_params):
      #Lots of existing code which uses existing objects
      #No await statements
      return output_data

async def my_function2():
      #Does more stuff

while True:
    loop = asyncio.get_event_loop()
    tasks = my_function(some_object, some_params), my_function2()
    output_data = loop.run_until_complete(asyncio.gather(*tasks))
    print(output_data)

Я быстро понял, что пока этот код выполняется, на самом деле ничего не происходит асинхронно, функции завершаются синхронно. Я очень новичок в асинхронном программировании, но я думаю, что это потому, что ни одна из моих функций не использует ключевое слово await или yield и, следовательно, эти функции не являются сопрограммами и не дают результатов, поэтому не дают возможности перейти к другому cooroutine. Пожалуйста, поправьте меня, если я ошибаюсь.

Мой вопрос заключается в том, можно ли обернуть сложные функции (где глубоко внутри они делают вызовы HTTP / IO) в ключевое слово asyncio await, например

async def my_function():
    print("Welcome to my function")
    data = await bigSlowFunction()

ОБНОВЛЕНИЕ - После ответа Карлсона

После того, как Карлсон принял ответ, я использовал следующий код, который прекрасно работает:

from concurrent.futures import ThreadPoolExecutor
import time    

#Some vars
a_var_1 = 0
a_var_2 = 10

pool = ThreadPoolExecutor(3)

future = pool.submit(my_big_function, object, a_var_1, a_var_2)
while not future.done() :
    print("Waiting for future...")
    time.sleep(0.01)
print("Future done")
print(future.result())

Это работает очень хорошо, и цикл future.done() / sleep дает вам представление о том, сколько циклов ЦП вы используете, выполняя асинхронную работу.

2 ответа

Решение

Короткий ответ: у вас не может быть преимуществ asyncio без явной пометки точек в вашем коде, где управление может быть передано обратно в цикл обработки событий. Это делается путем превращения ваших тяжелых функций ввода-вывода в сопрограммы, как вы и предполагали.

Не изменяя существующий код, вы можете достичь своей цели с помощью гринлетов (взгляните на eventlet или gevent).

Другой возможностью было бы использовать обертывание реализации Python's Future и передавать вызовы уже написанным функциям в некоторый ThreadPoolExecutor и получать итоговое Future. Имейте в виду, что это идет со всеми предостережениями многопоточного программирования, все же.

Нечто подобное

from concurrent.futures import ThreadPoolExecutor

from thinair import big_slow_function

executor = ThreadPoolExecutor(max_workers=5)

async def big_slow_coroutine():
    await executor.submit(big_slow_function)

Начиная с python 3.9 вы можете обернуть блокирующую (не асинхронную) функцию в сопрограмму, чтобы сделать ее ожидаемой, используя asyncio.to_thread(). Пример, приведенный в официальной документации :

      def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

Это кажется более объединенным подходом, чем использование concurrent.futuresчтобы сделать сопрограмму, но я не тестировал ее широко.

Другие вопросы по тегам