Как я могу «разбудить» цикл событий, чтобы уведомить его о завершении Future из другого потока?
При использовании python async/asyncio я часто создаю и завершаю
Если я не завершаю эти фьючерсы в потоке, который запускает цикл событий, или через функцию, которая уведомляет этот цикл о завершении, цикл событий часто не «замечает», что фьючерсы завершены.
Есть ли способ «уведомить» цикл событий о том, что он должен проверить Future на завершение, если это будущее было подготовлено (через set_result) извне?
Почему я это спрашиваю
Потоки, которым нужны готовые фьючерсы, должны: а) иметь очень низкую задержку и б) проверять, было ли готово фьючерс, синхронно, позже (например, через
Цикл событий, обрабатывающий фьючерсы, не обязательно должен иметь малую задержку при получении уведомления о том, что они готовы — он может быть уведомлен с опозданием на несколько миллисекунд.
В идеале должен быть производительный способ уведомить цикл событий о том, что Future был подготовлен после его синхронной подготовки в потоке.
Даже если это невозможно, цикл обработки событий может опрашивать готовность с интервалом, пока фьючерсы синхронно готовятся как можно быстрее в потоках.
Что я пробовал
«Правильный» способ решить эту проблему, например:
def do_in_thread(future):
future.get_loop().call_soon_threasafe(future.set_result, "the result")
Это надежно уведомляет цикл событий о готовности к будущему, но не работает по двум причинам:
- Он имеет значительные (в 8-10 раз) накладные расходы по сравнению со звонком.
в моих бенчмарках. - Он не готовит Future до тех пор, пока не запустится цикл событий, а это значит, что я не могу надежно проверить, готово ли Future, что мне нужно сделать. Например, это не сработает:
def do_in_thread(future):
future.get_loop().call_soon_threasafe(future.set_result, "the result")
assert future.done() # Fails
Одна вещь, которая, похоже , работает, — это уведомить цикл обработки событий, преднамеренно отказав второму вызову via и проглотив его, например:
def ensure_result(f, res):
try:
f.set_result(res)
except InvalidStateError:
pass
def in_thread(fut: Future):
fut.set_result("the result")
fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")
У этого все еще есть накладные расходы, но я мог бы убрать накладные расходы на вызов
- Это надежно работает? Является
провал с гарантированно уведомляет цикл событий о том, что данное Future может вернуться из , или это недокументированная деталь реализации, на которую я полагаюсь? - Есть ли лучший способ добиться этого периодического пробуждения, который не требует от меня отслеживания/опроса таких фьючерсов?
В идеальном мире было бы
1 ответ
Я столкнулся с той же существенной проблемой и несколько дней бился головой о стену. Я использовал обратные вызовы из расширения C++, чтобы установить будущий результат с помощью кода, работающего в другом системном потоке (созданном на стороне C). Я мог бы установить этот будущий результат, но циклу событий было все равно, если бы у меня не было других сопрограмм, поддерживающих его «живым» в этом потоке Python, и даже тогда он часто работал медленно.
Я использовал это концептуальное решение (публикация вашего примера кода), чтобы решить эту проблему:
def in_thread(fut: Future):
fut.set_result("the result")
asyncio.run_coroutine_threadsafe( asyncio.sleep(0), fut.get_loop() )
Результатом стала задержка примерно в 100 микросекунд (не милли, а микро) между установкой будущего и получением цикла событий «пробуждения» и его обработки.