Может ли concurrent.futures.Future быть преобразован в asyncio.Future?

Я практикуюсь asyncio после написания многопоточного кода много лет.

Заметил что-то, что мне кажется странным. Оба в asyncio И в concurrent E сть Future объект.

from asyncio import Future
from concurrent.futures import Future

Угадай, у каждого своя роль...

Мой вопрос, могу ли я перевести concurrent.future.Future в asyncio.Future (или наоборот)?

2 ответа

Решение

concurrent.futures.Future это объект, который мы используем для написания асинхронного кода на основе потоков ОС или процессов ОС наряду с другими вещами concurrent.futures Модуль предоставляет нам.

asyncio.Future это объект, который мы используем для написания асинхронного кода на основе сопрограмм и тому подобного asyncio Модуль предоставляет нам.

Другими словами concurrent.futures а также asyncio попытаться решить ту же задачу, но по-разному. Решение одной и той же задачи означает, что многие вещи будут похожи в подходе на основе потоков / процессов и подходе на основе сопрограмм. Например, взгляните на asyncio.Lock и threading.Lock - похожие, но разные.

Возможен ли переход между разными такими похожими объектами? Нет, это не так.

Существенная разница между asyncio и основанные на потоке модули делают сотрудничество неспособным:

  • В Asyncio вы должны await вещи, чтобы приостановить поток выполнения и позволить другим сопрограммам быть выполненными тем временем.

  • В потоковых модулях выполнение потока приостанавливается из-за приостановки всего потока.

Например, когда вы пишете код на основе потоков, вы пишете:

future = concurrent.futures.Future()
# ...
result = future.result()  # acts like time.sleep blocking whole thread

Но в asyncio вы не должны блокировать поток, вы должны вернуть управление в цикл обработки событий:

future = asyncio.Future()
# ...
result = await future  # block current execution flow returning control to event loop 
                       # without blocking thread,
                       # current flow will be resumed later

Мой вопрос, могу ли я перевести concurrent.future.Future в asyncio.Future (или наоборот)?

Если под "переносом" вы подразумеваете "преобразование одного в другое", да, это возможно, хотя для устранения несоответствия импеданса между подходами может потребоваться определенная работа.

Чтобы преобразовать concurrent.futures.Future в asyncio.Future, ты можешь позвонить asyncio.wrap_future, Возвращенное будущее asyncio ожидаемо в цикле событий asyncio и завершится, когда завершится базовое будущее потоков. Это эффективно, как run_in_executor реализовано.

Не существует общедоступной функциональности для прямого преобразования будущего asyncio в concurrent.futures будущее, но есть asyncio.run_coroutine_threadsafe Функция, которая принимает сопрограмму, отправляет ее в цикл обработки событий и возвращает параллельное будущее, которое завершается, когда это происходит в асинхронном будущем. Это может быть использовано для эффективного преобразования любого асинхронного будущего в параллельное будущее, например так:

def to_concurrent(fut, loop):
    async def wait():
        await fut
    return asyncio.run_coroutine_threadsafe(wait(), loop)

Возвращенное будущее будет вести себя так, как вы ожидаете от параллельного будущего, например, его result() метод блокируется и т. д. Одна вещь, о которой вы можете позаботиться, это то, что обратные вызовы выполняются add_done_callback (как всегда) выполняется в потоке, который завершает будущее, которое в этом случае является потоком цикла обработки событий. Это означает, что если вы добавляете готовые обратные вызовы, вы должны быть осторожны, чтобы не вызывать блокирующие вызовы в их реализации, чтобы не заблокировать цикл обработки событий.

Есть функция под названием wrap_future в asyncio.

Оберните объект concurrent.futures.Future в объект asyncio.Future.

См. https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future

Что касается части "параллельное будущее в будущее asyncio", вот утилита, которую я использую.

from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio


class AsyncThreadPool(ThreadPoolExecutor):
    _futures: List[asyncio.Future]
    _loop: asyncio.AbstractEventLoop

    def __init__(self, max_workers=None):
        super().__init__(max_workers)
        self._futures = []

    def queue(self, fn):
        self._loop = asyncio.get_event_loop()
        fut = self._loop.create_future()
        self._futures.append(fut)
        self.submit(self._entry, fn, fut)

    def queueAsync(self, coroutine):
        def newLoop():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coroutine)
        self.queue(newLoop)

    def _entry(self, fn, fut: asyncio.Future):
        try:
            result = fn()
            self._loop.call_soon_threadsafe(fut.set_result, result)
        except Exception as e:
            self._loop.call_soon_threadsafe(fut.set_exception, e)

    async def gather(self) -> List[Any]:
        return await asyncio.gather(*self._futures)

Вы можете использовать это так:

with AsyncThreadPool() as pool:
    # Queue some sync function (will be executed on another thread)
    pool.queue(someHeavySyncFunction)
    # Queue a coroutine that will be executed on a new event loop running on another thread
    pool.queue(otherAsyncFunction())

    # Gather results (non blocking for your current loop)
    res: List[Any] = await pool.gather()
Другие вопросы по тегам