Есть ли способ вызвать асинхронный метод Python из C++?

У нас есть кодовая база в Python, которая использует asyncio и сопрограммы (async методы и awaits), что я хотел бы сделать, это вызвать один из этих методов из класса C++, который был перенесен в python (используя pybind11)

Допустим, есть этот код:

class Foo:
  async def bar(a, b, c):
    # some stuff
    return c * a

Предполагая, что код вызывается из python, и есть цикл io, обрабатывающий это, в какой-то момент код падает на землю C++, где это bar метод должен быть вызван - как await результат этого в C++?

4 ответа

Возможно реализовать сопрограмму Python в C++, но требует некоторой работы. Вам нужно сделать то, что обычно делает для вас интерпретатор (на статических языках компилятор), и преобразовать вашу асинхронную функцию в конечный автомат. Рассмотрим очень простую сопрограмму:

async def coro():
    x = foo()
    y = await bar()
    baz(x, y)
    return 42

Вызов coro() не выполняет ни один из его кода, но он создает ожидаемый объект, который можно запустить, а затем возобновить несколько раз. (Но вы обычно не видите эти операции, потому что они прозрачно выполняются циклом событий.) Ожидаемое может ответить двумя различными способами: 1) приостановить или 2), показывая, что это сделано.

Внутри сопрограммы await реализует подвеску. Если сопрограмма была реализована с генератором, y = await bar() будет desugar для:

# pseudo-code for y = await bar()

_bar_iter = bar().__await__()
while True:
    try:
        _suspend_val = next(_bar_iter)
    except StopIteration as _stop:
        y = _stop.value
        break
    yield _suspend_val

Другими словами, await приостанавливает (дает), пока ожидаемый объект делает. Ожидаемый объект сигнализирует, что это сделано, поднимая StopIterationи путем контрабанды возвращаемого значения внутри его value приписывать. Если выход в цикле звучит как yield from, вы совершенно правы, и именно поэтому await часто описывается с точки зрения yield from, Тем не менее, в C++ у нас нет yield ( пока), поэтому мы должны интегрировать вышесказанное в конечный автомат.

Реализовать async def с нуля нам нужен тип, который удовлетворяет следующим ограничениям:

  • при построении ничего не делает - обычно он просто хранит полученные аргументы
  • имеет __await__ метод, который возвращает итеративный, который может быть просто self;
  • имеет __iter__ который возвращает итератор, который снова может быть self;
  • имеет __next__ метод, вызов которого реализует один шаг конечного автомата, с возвратом, означающим приостановку и поднятие StopIteration смысл отделки.

Вышеупомянутый конечный автомат сопрограммы в __next__ будет состоять из трех состояний:

  1. начальный, когда он вызывает foo() функция синхронизации
  2. следующее состояние, когда он продолжает ждать bar() сопрограммы до тех пор, пока он приостанавливает (распространяя приостановки) к вызывающей стороне. однажды bar() возвращает значение, мы можем сразу перейти к вызову baz() и возвращая значение через StopIteration исключение.
  3. конечное состояние, которое просто вызывает исключение, информирующее вызывающего абонента о том, что сопрограмма израсходована.

Итак async def coro() приведенное выше определение можно рассматривать как синтаксический сахар для следующего:

class coro:
    def __init__(self):
        self._state = 0

    def __iter__(self):
        return self

    def __await__(self):
        return self

    def __next__(self):
        if self._state == 0:
            self._x = foo()
            self._bar_iter = bar().__await__()
            self._state = 1

        if self._state == 1:
            try:
                suspend_val = next(self._bar_iter)
                # propagate the suspended value to the caller
                # don't change _state, we will return here for
                # as long as bar() keeps suspending
                return suspend_val
            except StopIteration as stop:
                # we got our value
                y = stop.value
            # since we got the value, immediately proceed to
            # invoking `baz`
            baz(self._x, y)
            self._state = 2
            # tell the caller that we're done and inform
            # it of the return value
            raise StopIteration(42)

        # the final state only serves to disable accidental
        # resumption of a finished coroutine
        raise RuntimeError("cannot reuse already awaited coroutine")

Мы можем проверить, что наша "сопрограмма" работает с использованием реальной асинхронности:

>>> class coro:
... (definition from above)
...
>>> def foo():
...     print('foo')
...     return 20
... 
>>> async def bar():
...     print('bar')
...     return 10
... 
>>> def baz(x, y):
...     print(x, y)
... 
>>> asyncio.run(coro())
foo
bar
20 10
42

Оставшаяся часть должна написать coro Класс в Python/C или в Pybind11.

Это не pybind11, но вы можете вызвать асинхронную функцию напрямую из C. Вы просто добавляете обратный вызов в будущее, используя add_done_callback. Я предполагаю, что pybind11 позволяет вам вызывать функции python, поэтому шаги будут такими же:

https://github.com/MarkReedZ/mrhttp/blob/master/src/mrhttp/internals/protocol.c

result = protocol_callPageHandler(self, r->func, request))

Теперь результатом асинхронной функции является будущее. Как и в python, вам нужно вызвать create_task, используя получающееся будущее:

PyObject *task;
if(!(task = PyObject_CallFunctionObjArgs(self->create_task, result, NULL))) return NULL;

И тогда вам нужно добавить обратный вызов, используя add_done_callback:

add_done_callback = PyObject_GetAttrString(task, "add_done_callback")
PyObject_CallFunctionObjArgs(add_done_callback, self->task_done, NULL)

self-> task_done - это функция C, зарегистрированная в python, которая будет вызываться после выполнения задачи.

Одиночные обратные вызовы

Во-первых, получите базовый обратный вызов, работающий из pybind11. Вот несколько ссылок:

Затем получите работающий пример Asyncio Future POC/. Видеть:

Теперь, когда вы выяснили эти два основных механизма, объедините их!

Ключевой «хитростью» является вызов функции pybind «setCallback» из Python и передача ей функции вашего будущего объекта . Теперь у вас будет обратный вызов из C++, передаваемый в asyncio Future!

Однако если вы используете многопоточность в своем C++, вы можете столкнуться с проблемой... Будущее значение может быть установлено, но цикл событий asyncio может не ответить. Чтобы решить эту проблему, я рекомендую добавить еще один обратный вызов, чтобы «разбудить» цикл. См.: /questions/62749133/kak-ya-mogu-razbudit-tsikl-sobyitij-chtobyi-uvedomit-ego-o-zavershenii-future-iz/66191760#66191760

НЕПРОВЕРАННЫЕ ПРИМЕРЫ

С++

      #include <string>
#include <thread>

#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/functional.h>
namespace py = pybind11;

typedef std::function<void(const std::string &)> MessageHandler;
typedef std::function<void()> WakeHandler;

class MyHelper
{
public:
    MyHelper(){}
    ~MyHelper(){}

    void setMessageHandler( const MessageHandler handler )
    { messageHandler_ = handler; }

    void setWakeHandler( const WakeHandler handler )
    { wakeHandler_ = handler; }

    void test()
    {   
        std::thread t([this](){
            std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
            if( messageHandler_ ) messageHandler_( "Hello from C++!" );
            if( wakeHandler_ ) wakeHandler_();
        });
        t.detach();    
    }

    static void pybindInit( py::module &m )
    {
        py::class_<MyHelper>( m, "MyHelper" )
            .def( py::init<>() )
            .def( "setMessageHandler", &MyClass::setMessageHandler,
                  py::call_guard<py::gil_scoped_release>() )
            .def( "setWakeHandler"   , &MyClass::setWakeHandler,
                  py::call_guard<py::gil_scoped_release>() )
            .def( "test"             , &MyClass::test,
                  py::call_guard<py::gil_scoped_release>() )
        ;
    }

private:
    MessageHandler messageHandler_;
    WakeHandler wakeHandler_;
};

Питон:

      import MyHelper

async def main( helper ):
    global event_loop
    event_loop = asyncio.get_running_loop()
    helper.test()
    await asyncio.gather( get_help( helper ), other_coro() )
    print( "Success!" )

def wake():
    asyncio.run_coroutine_threadsafe( asyncio.sleep( 0 ), event_loop )

async def get_help( helper ):        
    future = event_loop.create_future()
    helper.setMessageHandler( future.set_result )
    helper.setWakeHandler( wake )
    print( await future )

async def other_coro():
    for i in range(5):
        await asyncio.sleep( 1 )
        print( "some other python work...." )

if __name__ == "__main__": asyncio.run( main( MyHelper() ) )

Повторяющиеся обратные вызовы

Хорошо... так что это может быть и не так, в зависимости от того, что именно вам нужно.... A Future'sset_resultуволить можно только один раз. :( Кроме того, вы можете захотеть «отменить» его из C++ или вернуть исключение...

Если ваша цель состоит в том, чтобы иногда обращаться к C++ «за помощью» и асинхронно получать результат, то то, что я описал/продемонстрировал, должно работать. Но если вы хотите, чтобы C++ неоднократно отправлял «события» или асинхронные «сообщения» в ваш скрипт Python, когда захочет, вам придется проделать немного больше работы с обеих сторон, чтобы расширить эту простую конструкцию. Короче говоря, со стороны Python вам захочется продолжать создавать новые будущие объекты и передавать их в C++ для каждого обратного вызова. Вы также должны быть уверены, что на стороне C++ каждый из этих обратных вызовов будет использоваться только один раз и, при необходимости, блокироваться с этой стороны до тех пор, пока не будет назначен новый и, следовательно, «готовый» на стороне Python.

Для таких вещей, если я не хочу углубляться в API CPython, я просто пишу свои вещи на Python и вызываю это, используя pybind с интерфейсом Python.

Пример: https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/__init__.py https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/pydrake_pybind.h#L359

Рендеринг на этот вариант использования, возможно, вы можете сделать:

# cpp_helpers.py
def await_(obj):
    return await obj
py::object py_await = py::module::import("cpp_helpers").attr("await_");
auto result = py::cast<MyResult>(py_await(py_obj));

Однако это, скорее всего, будет менее производительным, чем приведенные выше решения.

Другие вопросы по тегам