Может ли concurrent.futures.Future быть преобразован в asyncio.Future?
Я практикуюсь asyncio
после написания многопоточного кода много лет.
Заметил что-то, что мне кажется странным. Оба в asyncio
И в concurrent
E сть Future
объект.
from asyncio import Future
from concurrent.futures import Future
Угадай, у каждого своя роль...
Мой вопрос, могу ли я перевести concurrent.future.Future
в asyncio.Future
(или наоборот)?
2 ответа
concurrent.futures.Future
это объект, который мы используем для написания асинхронного кода на основе потоков ОС или процессов ОС наряду с другими вещами concurrent.futures
Модуль предоставляет нам.
asyncio.Future
это объект, который мы используем для написания асинхронного кода на основе сопрограмм и тому подобного asyncio
Модуль предоставляет нам.
Другими словами concurrent.futures
а также asyncio
попытаться решить ту же задачу, но по-разному. Решение одной и той же задачи означает, что многие вещи будут похожи в подходе на основе потоков / процессов и подходе на основе сопрограмм. Например, взгляните на asyncio.Lock и threading.Lock - похожие, но разные.
Возможен ли переход между разными такими похожими объектами? Нет, это не так.
Существенная разница между asyncio
и основанные на потоке модули делают сотрудничество неспособным:
В Asyncio вы должны
await
вещи, чтобы приостановить поток выполнения и позволить другим сопрограммам быть выполненными тем временем.В потоковых модулях выполнение потока приостанавливается из-за приостановки всего потока.
Например, когда вы пишете код на основе потоков, вы пишете:
future = concurrent.futures.Future()
# ...
result = future.result() # acts like time.sleep blocking whole thread
Но в asyncio вы не должны блокировать поток, вы должны вернуть управление в цикл обработки событий:
future = asyncio.Future()
# ...
result = await future # block current execution flow returning control to event loop
# without blocking thread,
# current flow will be resumed later
Мой вопрос, могу ли я перевести
concurrent.future.Future
вasyncio.Future
(или наоборот)?
Если под "переносом" вы подразумеваете "преобразование одного в другое", да, это возможно, хотя для устранения несоответствия импеданса между подходами может потребоваться определенная работа.
Чтобы преобразовать concurrent.futures.Future
в asyncio.Future
, ты можешь позвонить asyncio.wrap_future
, Возвращенное будущее asyncio ожидаемо в цикле событий asyncio и завершится, когда завершится базовое будущее потоков. Это эффективно, как run_in_executor
реализовано.
Не существует общедоступной функциональности для прямого преобразования будущего asyncio в concurrent.futures
будущее, но есть asyncio.run_coroutine_threadsafe
Функция, которая принимает сопрограмму, отправляет ее в цикл обработки событий и возвращает параллельное будущее, которое завершается, когда это происходит в асинхронном будущем. Это может быть использовано для эффективного преобразования любого асинхронного будущего в параллельное будущее, например так:
def to_concurrent(fut, loop):
async def wait():
await fut
return asyncio.run_coroutine_threadsafe(wait(), loop)
Возвращенное будущее будет вести себя так, как вы ожидаете от параллельного будущего, например, его result()
метод блокируется и т. д. Одна вещь, о которой вы можете позаботиться, это то, что обратные вызовы выполняются add_done_callback
(как всегда) выполняется в потоке, который завершает будущее, которое в этом случае является потоком цикла обработки событий. Это означает, что если вы добавляете готовые обратные вызовы, вы должны быть осторожны, чтобы не вызывать блокирующие вызовы в их реализации, чтобы не заблокировать цикл обработки событий.
Есть функция под названием
wrap_future
в asyncio.
Оберните объект concurrent.futures.Future в объект asyncio.Future.
См. https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future
Что касается части "параллельное будущее в будущее asyncio", вот утилита, которую я использую.
from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
class AsyncThreadPool(ThreadPoolExecutor):
_futures: List[asyncio.Future]
_loop: asyncio.AbstractEventLoop
def __init__(self, max_workers=None):
super().__init__(max_workers)
self._futures = []
def queue(self, fn):
self._loop = asyncio.get_event_loop()
fut = self._loop.create_future()
self._futures.append(fut)
self.submit(self._entry, fn, fut)
def queueAsync(self, coroutine):
def newLoop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coroutine)
self.queue(newLoop)
def _entry(self, fn, fut: asyncio.Future):
try:
result = fn()
self._loop.call_soon_threadsafe(fut.set_result, result)
except Exception as e:
self._loop.call_soon_threadsafe(fut.set_exception, e)
async def gather(self) -> List[Any]:
return await asyncio.gather(*self._futures)
Вы можете использовать это так:
with AsyncThreadPool() as pool:
# Queue some sync function (will be executed on another thread)
pool.queue(someHeavySyncFunction)
# Queue a coroutine that will be executed on a new event loop running on another thread
pool.queue(otherAsyncFunction())
# Gather results (non blocking for your current loop)
res: List[Any] = await pool.gather()