Проблема уничтожения вложенных сопрограмм в C++

Я экспериментирую с clang-5 и его реализацией TS сопрограммы. Я пытаюсь использовать его с boost asio, но сталкиваюсь с проблемами, похоже, мой фрейм стека сопрограмм разрушается дважды, но мне не удается понять, что я делаю неправильно.

Это мой упрощенный сценарий воспроизведения:

#include <experimental/coroutine>
#include <boost/asio.hpp>
#include <iostream>

using namespace boost::asio;

inline auto async_connect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
    struct Awaiter
    {
        ip::tcp::socket& s;
        ip::tcp::endpoint ep;
        boost::system::error_code ec;
        bool ready = false;

        Awaiter(ip::tcp::socket& s, const ip::tcp::endpoint& ep) : s(s) , ep(ep) {}
        bool await_ready() { return ready; }
        void await_resume()
        {
            if (ec)
            {
                throw boost::system::system_error(ec);
            }
        }
        void await_suspend(std::experimental::coroutine_handle<> coro)
        {
            s.async_connect(this->ep, [this, coro] (auto ec) mutable {
                this->ready = true;
                this->ec = ec;
                std::cout << "Connect ready: resume " << coro.address() << "\n";
                coro.resume();
            });

            std::cout << "Connect initiated\n";
        }
    };

    return Awaiter(s, ep);
}

struct Future
{
    struct promise_type
    {
        bool _ready = false;
        std::experimental::coroutine_handle<> _waiter = nullptr;

        Future get_return_object()
        {
            auto coro = std::experimental::coroutine_handle<promise_type>::from_promise(*this);
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << coro.address() << "]\n";
            return Future(coro);
        }

        auto initial_suspend()
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
            return std::experimental::suspend_never();
        }

        auto final_suspend()
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
            return std::experimental::suspend_always();
        }

        void return_void()
        {
            _ready = true;
            if (_waiter)
            {
                std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "] resume waiter[" << _waiter.address() << "]\n ";
                _waiter.resume();
            }
        }

        void unhandled_exception()
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
            std::terminate();
        }
    };

    Future() = default;
    Future(const Future&) = delete;
    Future& operator=(const Future&) = delete;
    Future& operator=(Future&& other)
    {
        _coro = other._coro;
        other._coro = nullptr;
        return *this;
    }

    explicit Future(std::experimental::coroutine_handle<promise_type> coro)
    : _coro(coro)
    {
        std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
    }

    Future(Future&& f)
    : _coro(f._coro)
    {
        f._coro = nullptr;
    }

    ~Future()
    {
        if (_coro)
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
            _coro.destroy();
            //_coro = nullptr;
        }
    }

    bool await_ready()
    {
        assert(_coro);
        std::cout << __FUNCTION__ << ":" << __LINE__ << ": ready " << _coro.promise()._ready << std::endl;
        return _coro.promise()._ready;
    }

    void await_suspend(std::experimental::coroutine_handle<> callerCoro)
    {
        std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "] waiter [" << callerCoro.address() << "]\n";
        _coro.promise()._waiter = callerCoro;
    }

    void await_resume()
    {
        std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
    }

    void* address()
    {
        return _coro.address();
    }

    std::experimental::coroutine_handle<promise_type> _coro = nullptr;
};

struct RaiiCheck
{
    ~RaiiCheck()
    {
        std::cout << "******** ~RaiiCheck " << this << " ********\n";
    }
};

Future asioConnect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
    RaiiCheck rc;
    s.open(ep.protocol());
    co_await async_connect(s, ep);
}

template <typename Awaitable>
Future waitForIt(io_service& io, Awaitable&& awaitable)
{
    co_await awaitable;
    io.stop();
}

static Future performRequest(io_service& io)
{
    try
    {
        auto addr = ip::tcp::endpoint(ip::address::from_string("8.8.8.8"), 53);
        ip::tcp::socket socket(io);

        // Works
        //socket.open(addr.protocol());
        //co_await async_connect(socket, addr);

        // Crashes
        std::cout << "asioConnect\n";
        co_await asioConnect(socket, addr);
        assert(socket.is_open());
    }
    catch (const boost::system::system_error& e)
    {
        std::cerr << "Failed to parse http response: " << e.what() << "\n";
    }
}

int main(int, char**)
{
    io_service io;

    auto task = performRequest(io);

    Future fut;
    io.post([&] () {
        fut = waitForIt(io, task);
    });

    io.run();
}

Он генерирует следующий вывод:

get_return_object:51 [0x7fbbeed03c00]
Future:99 [0x7fbbeed03c00]
initial_suspend:57 [0x7fbbeed03c00]
asioConnect
get_return_object:51 [0x7fbbeed03d50]
Future:99 [0x7fbbeed03d50]
initial_suspend:57 [0x7fbbeed03d50]
Connect initiated
await_suspend:129 [0x7fbbeed03d50] waiter [0x7fbbeed03c00]
get_return_object:51 [0x7fbbeec028a0]
Future:99 [0x7fbbeec028a0]
initial_suspend:57 [0x7fbbeec028a0]
await_suspend:129 [0x7fbbeed03c00] waiter [0x7fbbeec028a0]
Connect ready: resume 0x7fbbeed03d50
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03d50] resume waiter[0x7fbbeed03c00]
await_resume:135 [0x7fbbeed03d50]
~Future:113 [0x7fbbeed03d50]
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03c00] resume waiter[0x7fbbeec028a0]
await_resume:135 [0x7fbbeed03c00]
return_void:69 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeed03c00]
final_suspend:63 [0x7fbbeed03d50]
~Future:113 [0x7fbbeec028a0]
~Future:113 [0x7fbbeed03c00]
corotest(74949,0x7fffc163c3c0) malloc: *** error for object 0x7fbbeed03d50: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
Abort trap: 6

Как вы можете видеть, объект RaiiCheck в функции asioConnect уничтожается дважды.

Объект Future внутри функции asioConnect имеет дескриптор coroutine_handle с адресом 0x7fbbeed03d50. Когда эта сопрограмма возобновляется в обратном вызове завершения s.async_connect, объект RaiiCheck уничтожается впервые. Затем я вижу, как вызывается функция return_void, которая возобновляет родительскую сопрограмму, которая, похоже, снова возобновляет внутреннюю сопрограмму, вызывая очередное уничтожение объекта RaiiCheck.

Похоже, это связано с моим вложением сопрограмм, потому что это работает, если я co_await непосредственно на вызов async_connect вместо того, чтобы помещать вызов async_connect в другую сопрограмму.

Любая подсказка для решения этой проблемы высоко ценится.

1 ответ

Решение

Вы должны возобновить работу официанта только после final_suspend(), не в return_void()потому что кадр сопрограммы не разматывается раньше final_suspend(), у вас будут проблемы с вызовом destroy() прежде чем это наконец приостановлено.

Исправленный код для final_suspend() выглядит так:

auto final_suspend()
{
    struct awaiter
    {
        std::experimental::coroutine_handle<> _waiter;

        bool await_ready() { return false; }

        void await_suspend(std::experimental::coroutine_handle<> coro)
        {
            if (_waiter)
            {
                _waiter();
            }
        }

        void await_resume() {}
    };
    return awaiter{_waiter};
}
Другие вопросы по тегам