Как я могу «разбудить» цикл событий, чтобы уведомить его о завершении Future из другого потока?

При использовании python async/asyncio я часто создаю и завершаю объекты из потоков, которые не являются потоком, выполняющим цикл обработки событий.

Если я не завершаю эти фьючерсы в потоке, который запускает цикл событий, или через функцию, которая уведомляет этот цикл о завершении, цикл событий часто не «замечает», что фьючерсы завершены.

Есть ли способ «уведомить» цикл событий о том, что он должен проверить Future на завершение, если это будущее было подготовлено (через set_result) извне?

Почему я это спрашиваю

Потоки, которым нужны готовые фьючерсы, должны: а) иметь очень низкую задержку и б) проверять, было ли готово фьючерс, синхронно, позже (например, через ).

Цикл событий, обрабатывающий фьючерсы, не обязательно должен иметь малую задержку при получении уведомления о том, что они готовы — он может быть уведомлен с опозданием на несколько миллисекунд.

В идеале должен быть производительный способ уведомить цикл событий о том, что Future был подготовлен после его синхронной подготовки в потоке.

Даже если это невозможно, цикл обработки событий может опрашивать готовность с интервалом, пока фьючерсы синхронно готовятся как можно быстрее в потоках.

Что я пробовал

«Правильный» способ решить эту проблему, например:

      def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")

Это надежно уведомляет цикл событий о готовности к будущему, но не работает по двум причинам:

  1. Он имеет значительные (в 8-10 раз) накладные расходы по сравнению со звонком. в моих бенчмарках.
  2. Он не готовит Future до тех пор, пока не запустится цикл событий, а это значит, что я не могу надежно проверить, готово ли Future, что мне нужно сделать. Например, это не сработает:
      def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")
    assert future.done()  # Fails

Одна вещь, которая, похоже , работает, — это уведомить цикл обработки событий, преднамеренно отказав второму вызову via и проглотив его, например:

      def ensure_result(f, res):
    try:
        f.set_result(res)
    except InvalidStateError:
        pass


def in_thread(fut: Future):
    fut.set_result("the result")
    fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")

У этого все еще есть накладные расходы, но я мог бы убрать накладные расходы на вызов отслеживая фьючерсы в структуре данных с общим потоком и опрашивая вызовы время от времени. Однако я все еще не уверен:

  1. Это надежно работает? Является провал с гарантированно уведомляет цикл событий о том, что данное Future может вернуться из , или это недокументированная деталь реализации, на которую я полагаюсь?
  2. Есть ли лучший способ добиться этого периодического пробуждения, который не требует от меня отслеживания/опроса таких фьючерсов?

В идеальном мире было бы или же метод, который бы достиг этого эффективно, но я не знаю ни одного.

1 ответ

Я столкнулся с той же существенной проблемой и несколько дней бился головой о стену. Я использовал обратные вызовы из расширения C++, чтобы установить будущий результат с помощью кода, работающего в другом системном потоке (созданном на стороне C). Я мог бы установить этот будущий результат, но циклу событий было все равно, если бы у меня не было других сопрограмм, поддерживающих его «живым» в этом потоке Python, и даже тогда он часто работал медленно.

Я использовал это концептуальное решение (публикация вашего примера кода), чтобы решить эту проблему:

      def in_thread(fut: Future):
    fut.set_result("the result")
    asyncio.run_coroutine_threadsafe( asyncio.sleep(0), fut.get_loop() )

Результатом стала задержка примерно в 100 микросекунд (не милли, а микро) между установкой будущего и получением цикла событий «пробуждения» и его обработки.

Другие вопросы по тегам