Есть ли способ вызвать асинхронный метод Python из C++?
У нас есть кодовая база в Python, которая использует asyncio и сопрограммы (async
методы и await
s), что я хотел бы сделать, это вызвать один из этих методов из класса C++, который был перенесен в python (используя pybind11)
Допустим, есть этот код:
class Foo:
async def bar(a, b, c):
# some stuff
return c * a
Предполагая, что код вызывается из python, и есть цикл io, обрабатывающий это, в какой-то момент код падает на землю C++, где это bar
метод должен быть вызван - как await
результат этого в C++?
4 ответа
Возможно реализовать сопрограмму Python в C++, но требует некоторой работы. Вам нужно сделать то, что обычно делает для вас интерпретатор (на статических языках компилятор), и преобразовать вашу асинхронную функцию в конечный автомат. Рассмотрим очень простую сопрограмму:
async def coro():
x = foo()
y = await bar()
baz(x, y)
return 42
Вызов coro()
не выполняет ни один из его кода, но он создает ожидаемый объект, который можно запустить, а затем возобновить несколько раз. (Но вы обычно не видите эти операции, потому что они прозрачно выполняются циклом событий.) Ожидаемое может ответить двумя различными способами: 1) приостановить или 2), показывая, что это сделано.
Внутри сопрограммы await
реализует подвеску. Если сопрограмма была реализована с генератором, y = await bar()
будет desugar для:
# pseudo-code for y = await bar()
_bar_iter = bar().__await__()
while True:
try:
_suspend_val = next(_bar_iter)
except StopIteration as _stop:
y = _stop.value
break
yield _suspend_val
Другими словами, await
приостанавливает (дает), пока ожидаемый объект делает. Ожидаемый объект сигнализирует, что это сделано, поднимая StopIteration
и путем контрабанды возвращаемого значения внутри его value
приписывать. Если выход в цикле звучит как yield from
, вы совершенно правы, и именно поэтому await
часто описывается с точки зрения yield from
, Тем не менее, в C++ у нас нет yield
( пока), поэтому мы должны интегрировать вышесказанное в конечный автомат.
Реализовать async def
с нуля нам нужен тип, который удовлетворяет следующим ограничениям:
- при построении ничего не делает - обычно он просто хранит полученные аргументы
- имеет
__await__
метод, который возвращает итеративный, который может быть простоself
; - имеет
__iter__
который возвращает итератор, который снова может бытьself
; - имеет
__next__
метод, вызов которого реализует один шаг конечного автомата, с возвратом, означающим приостановку и поднятиеStopIteration
смысл отделки.
Вышеупомянутый конечный автомат сопрограммы в __next__
будет состоять из трех состояний:
- начальный, когда он вызывает
foo()
функция синхронизации - следующее состояние, когда он продолжает ждать
bar()
сопрограммы до тех пор, пока он приостанавливает (распространяя приостановки) к вызывающей стороне. однаждыbar()
возвращает значение, мы можем сразу перейти к вызовуbaz()
и возвращая значение черезStopIteration
исключение. - конечное состояние, которое просто вызывает исключение, информирующее вызывающего абонента о том, что сопрограмма израсходована.
Итак async def coro()
приведенное выше определение можно рассматривать как синтаксический сахар для следующего:
class coro:
def __init__(self):
self._state = 0
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self._state == 0:
self._x = foo()
self._bar_iter = bar().__await__()
self._state = 1
if self._state == 1:
try:
suspend_val = next(self._bar_iter)
# propagate the suspended value to the caller
# don't change _state, we will return here for
# as long as bar() keeps suspending
return suspend_val
except StopIteration as stop:
# we got our value
y = stop.value
# since we got the value, immediately proceed to
# invoking `baz`
baz(self._x, y)
self._state = 2
# tell the caller that we're done and inform
# it of the return value
raise StopIteration(42)
# the final state only serves to disable accidental
# resumption of a finished coroutine
raise RuntimeError("cannot reuse already awaited coroutine")
Мы можем проверить, что наша "сопрограмма" работает с использованием реальной асинхронности:
>>> class coro:
... (definition from above)
...
>>> def foo():
... print('foo')
... return 20
...
>>> async def bar():
... print('bar')
... return 10
...
>>> def baz(x, y):
... print(x, y)
...
>>> asyncio.run(coro())
foo
bar
20 10
42
Оставшаяся часть должна написать coro
Класс в Python/C или в Pybind11.
Это не pybind11, но вы можете вызвать асинхронную функцию напрямую из C. Вы просто добавляете обратный вызов в будущее, используя add_done_callback. Я предполагаю, что pybind11 позволяет вам вызывать функции python, поэтому шаги будут такими же:
https://github.com/MarkReedZ/mrhttp/blob/master/src/mrhttp/internals/protocol.c
result = protocol_callPageHandler(self, r->func, request))
Теперь результатом асинхронной функции является будущее. Как и в python, вам нужно вызвать create_task, используя получающееся будущее:
PyObject *task;
if(!(task = PyObject_CallFunctionObjArgs(self->create_task, result, NULL))) return NULL;
И тогда вам нужно добавить обратный вызов, используя add_done_callback:
add_done_callback = PyObject_GetAttrString(task, "add_done_callback")
PyObject_CallFunctionObjArgs(add_done_callback, self->task_done, NULL)
self-> task_done - это функция C, зарегистрированная в python, которая будет вызываться после выполнения задачи.
Одиночные обратные вызовы
Во-первых, получите базовый обратный вызов, работающий из pybind11. Вот несколько ссылок:
Затем получите работающий пример Asyncio Future POC/. Видеть:
Теперь, когда вы выяснили эти два основных механизма, объедините их!
Ключевой «хитростью» является вызов функции pybind «setCallback» из Python и передача ей функции вашего будущего объекта . Теперь у вас будет обратный вызов из C++, передаваемый в asyncio Future!
Однако если вы используете многопоточность в своем C++, вы можете столкнуться с проблемой... Будущее значение может быть установлено, но цикл событий asyncio может не ответить. Чтобы решить эту проблему, я рекомендую добавить еще один обратный вызов, чтобы «разбудить» цикл. См.: /questions/62749133/kak-ya-mogu-razbudit-tsikl-sobyitij-chtobyi-uvedomit-ego-o-zavershenii-future-iz/66191760#66191760
НЕПРОВЕРАННЫЕ ПРИМЕРЫ
С++
#include <string>
#include <thread>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/functional.h>
namespace py = pybind11;
typedef std::function<void(const std::string &)> MessageHandler;
typedef std::function<void()> WakeHandler;
class MyHelper
{
public:
MyHelper(){}
~MyHelper(){}
void setMessageHandler( const MessageHandler handler )
{ messageHandler_ = handler; }
void setWakeHandler( const WakeHandler handler )
{ wakeHandler_ = handler; }
void test()
{
std::thread t([this](){
std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
if( messageHandler_ ) messageHandler_( "Hello from C++!" );
if( wakeHandler_ ) wakeHandler_();
});
t.detach();
}
static void pybindInit( py::module &m )
{
py::class_<MyHelper>( m, "MyHelper" )
.def( py::init<>() )
.def( "setMessageHandler", &MyClass::setMessageHandler,
py::call_guard<py::gil_scoped_release>() )
.def( "setWakeHandler" , &MyClass::setWakeHandler,
py::call_guard<py::gil_scoped_release>() )
.def( "test" , &MyClass::test,
py::call_guard<py::gil_scoped_release>() )
;
}
private:
MessageHandler messageHandler_;
WakeHandler wakeHandler_;
};
Питон:
import MyHelper
async def main( helper ):
global event_loop
event_loop = asyncio.get_running_loop()
helper.test()
await asyncio.gather( get_help( helper ), other_coro() )
print( "Success!" )
def wake():
asyncio.run_coroutine_threadsafe( asyncio.sleep( 0 ), event_loop )
async def get_help( helper ):
future = event_loop.create_future()
helper.setMessageHandler( future.set_result )
helper.setWakeHandler( wake )
print( await future )
async def other_coro():
for i in range(5):
await asyncio.sleep( 1 )
print( "some other python work...." )
if __name__ == "__main__": asyncio.run( main( MyHelper() ) )
Повторяющиеся обратные вызовы
Хорошо... так что это может быть и не так, в зависимости от того, что именно вам нужно.... A Future'sset_result
уволить можно только один раз. :( Кроме того, вы можете захотеть «отменить» его из C++ или вернуть исключение...
Если ваша цель состоит в том, чтобы иногда обращаться к C++ «за помощью» и асинхронно получать результат, то то, что я описал/продемонстрировал, должно работать. Но если вы хотите, чтобы C++ неоднократно отправлял «события» или асинхронные «сообщения» в ваш скрипт Python, когда захочет, вам придется проделать немного больше работы с обеих сторон, чтобы расширить эту простую конструкцию. Короче говоря, со стороны Python вам захочется продолжать создавать новые будущие объекты и передавать их в C++ для каждого обратного вызова. Вы также должны быть уверены, что на стороне C++ каждый из этих обратных вызовов будет использоваться только один раз и, при необходимости, блокироваться с этой стороны до тех пор, пока не будет назначен новый и, следовательно, «готовый» на стороне Python.
Для таких вещей, если я не хочу углубляться в API CPython, я просто пишу свои вещи на Python и вызываю это, используя pybind
с интерфейсом Python.
Пример: https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/__init__.py https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/pydrake_pybind.h#L359
Рендеринг на этот вариант использования, возможно, вы можете сделать:
# cpp_helpers.py
def await_(obj):
return await obj
py::object py_await = py::module::import("cpp_helpers").attr("await_");
auto result = py::cast<MyResult>(py_await(py_obj));
Однако это, скорее всего, будет менее производительным, чем приведенные выше решения.