boost::beast websocket асинхронное чтение и запись потеряны

Я пытаюсь прочитать API веб-сокета RocketChat, используя boost::beast. Это работает, когда я читаю его, используя синхронные примеры, но при использовании функций async_*, оказывается, соединение закрыто.

Оскорбительный код в вопросе:

#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/connect.hpp>
#include <iostream>
#include <deque>


class connection
{
    public:
        connection(boost::asio::io_context &io_context, boost::asio::ssl::context &ssl_context) :
            _io_context{ io_context },
            _resolver{ io_context },
            _websocket{ io_context, ssl_context },
            _dynamic_buffer{ boost::asio::dynamic_buffer(_read_buffer) }
        {}

        void connect(std::string_view host)
        {
            auto addresses = _resolver.resolve(host, "https");

            boost::asio::connect(_websocket.lowest_layer(), addresses);
            _websocket.next_layer().handshake(boost::asio::ssl::stream_base::client);
            _websocket.handshake(boost::beast::string_view{ host.data(), host.size() }, "/websocket");

            read_message();
        }

        void send(std::string &&message)
        {
            _write_buffer.push_back(std::move(message));

            if (_write_buffer.size() == 1) {
                send_message();
            }

            _websocket.write(boost::asio::buffer(message));
        }
    private:
        using tcp_socket = boost::asio::ip::tcp::socket;
        using ssl_socket = boost::asio::ssl::stream<tcp_socket>;

        void read_message()
        {
            _websocket.async_read(_dynamic_buffer, [this](const boost::system::error_code &error, size_t transferred) {
                if (error == boost::asio::error::operation_aborted) {
                    std::cout << "read aborted" << std::endl;
                    return;
                } else if (error) {
                    std::cout << "read error: " << error.message() << std::endl;
                    return;
                }

                std::cout << " >> " << _read_buffer << std::endl;

                _read_buffer.clear();
                read_message();
            });
        }

        void send_message()
        {
            if (_write_buffer.empty()) {
                return;
            }

            _websocket.async_write(boost::asio::buffer(_write_buffer.front()), [this](const boost::system::error_code &error, size_t transferred) {
                if (error == boost::asio::error::operation_aborted) {
                    std::cout << "write aborted" << std::endl;
                    return;
                } else if (error) {
                    std::cout << "write error: " << error.message() << std::endl;
                    return;
                }

                if (transferred != _write_buffer.front().size()) {
                    std::cout << "Wrote " << transferred << " bytes instead of the expected " << _write_buffer.front().size() << std::endl;
                    return;
                }

                std::cout << " << " << _write_buffer.front() << std::endl;

                _write_buffer.pop_front();
                send_message();
            });
        }

        boost::asio::io_context                             &_io_context;
        boost::asio::ip::tcp::resolver                      _resolver;
        boost::beast::websocket::stream<ssl_socket>         _websocket;
        std::string                                         _read_buffer;
        decltype(boost::asio::dynamic_buffer(_read_buffer)) _dynamic_buffer;
        std::deque<std::string>                             _write_buffer;
};

int main()
{
    boost::asio::io_context     io_context;
    boost::asio::ssl::context   ssl_context { boost::asio::ssl::context::sslv23_client  };
    connection                  connection  { io_context, ssl_context                   };

    connection.connect("open.rocket.chat");
    send("{\"msg\":\"connect\",\"version\":\"1\",\"support\":[\"1\"]}");
    io_context.run();
}

Выполнение этого кода приводит к следующему выводу:

 >> {"server_id":"0"}
 << {"msg":"connect","version":"1","support":["1"]}
read error: The WebSocket stream was gracefully closed at both endpoints

Если я не отправляю исходное сообщение после подключения, соединение не закрывается. Если вместо выполнения асинхронных вызовов я использую синхронные версии, они также не закрываются.

Это наверное что-то очень глупое я делаю не так. Если я правильно интерпретирую документы, то должно быть допустимо инициировать как чтение, так и запись в веб-сокете, если они сделаны из одной и той же цепи, и, поскольку у меня есть только один поток, у меня должна быть неявная цепь.

Что я делаю неправильно?

0 ответов

Другие вопросы по тегам