Как бороться с дополнительными символами, читаемыми в ASIO streambuf?

В большинстве библиотек парсеры работают только над std::istream или один непрерывный буфер. Эти парсеры читают istream до конца, а не до конца документа. Даже если есть хороший boost::asio::streambuf, которые могут быть использованы с istreamСуществует проблема с чтением и фиксацией только одного кадра. Функции как read_until передают все, что они читают, и если они читают фрагмент следующего кадра, заполнение при разборе завершается неудачно.

Этот насмешливый пример на Колиру показывает проблему.

Предполагая, что нам нужно эффективное решение без копирования буферов, я должен убедиться, что конец потока является правильным концом документа. Мое текущее решение состоит в том, чтобы сканировать данные и умножать фиксацию / потребление в течение одного подготовленного буфера:

size_t read_some_frames( boost::asio::streambuf& strbuf, 
                         std::function< void(istream&) > parser ) {
        auto buffers= strbuf.prepare( 1024 );
        size_t read= bad_case_of_read_some( buffers );

        vector< std::pair< size_t, size_t > > frames;
        std::pair< size_t, size_t > leftover= scanForFrames( 
                    buffers_begin(buffers), 
                    buffers_begin(buffers)+read, 
                    frames, '\0' );

        for( auto const& frame: frames ) {
            cout << "Frame size: " << frame.first 
                      << " skip: " << frame.second << endl;
            strbuf.commit( frame.first );
            strbuf.consume( frame.second );
            iostream stream( &strbuf );
            parser( stream );
        }
        cout << "Unfinished frame size: " << leftover.first 
                             << " skip:" << leftover.second << endl;
        strbuf.commit( leftover.first );
        strbuf.consume( leftover.second );
        return read;
}

Жить на Колиру

Согласно документации, это неправильно. Я думаю, что этот код работает, потому что этот вызов commit и потребление не освобождает внутренние буферы. Как-то мне нужно разобраться с этим.

Каковы возможные решения?

2 ответа

Решение

В то время как read_until() операции фиксируют все данные, прочитанные во входную последовательность streambuf, они возвращают bytes_transferred значение, содержащее количество байтов до и включая первый разделитель. По сути, это обеспечивает размер кадра, и можно ограничить istream читать только часть streambuf входная последовательность:

  • Использование пользовательских istream это ограничивает количество байтов, считываемых из потокового буфера. Один из самых простых способов сделать это - использовать Boost.IOStream's boost::iostreams::stream и реализовать модель концепции источника.
  • Создать кастом streambuf что происходит от Boost.Asio's streambuf, Чтобы ограничить количество байтов, считываемых из доступной входной последовательности, пользовательские функции должны будут манипулировать концом входной последовательности. Кроме того, обычай streambuf нужно будет справиться с недостаточным объемом.

изготовленный на заказ Source для Boost.IOStream

Boost.IOStream-х boost::iostreams::stream объект делегирует операции ввода-вывода на устройство. Устройство - это пользовательский код, который реализует модель различных концепций Boost.IOStream. В этом случае концепция Source, которая обеспечивает доступ для чтения к последовательности символов, является единственной необходимой. Кроме того, когда boost::iostreams::stream использует исходное устройство, оно будет наследоваться от std::basic_istream,

В следующем коде asio_streambuf_input_device является моделью концепции Source, которая читает из потокового буфера Boost.Asio. Когда определенное количество байтов было прочитано, asio_streambuf_input_device указывает на переполнение, даже если базовый потоковый буфер все еще содержит данные во входной последовательности.

/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
  : public boost::iostreams::source // Use convenience class.
{
public:

  explicit
  asio_streambuf_input_device(
      boost::asio::streambuf& streambuf,
      std::streamsize bytes_transferred
  )
    : streambuf_(streambuf),
      bytes_remaining_(bytes_transferred)
  {}

  std::streamsize read(char_type* buffer, std::streamsize buffer_size)
  {
    // Determine max amount of bytes to copy.
    auto bytes_to_copy =
      std::min(bytes_remaining_, std::min(
          static_cast<std::streamsize>(streambuf_.size()), buffer_size));

    // If there is no more data to be read, indicate end-of-sequence per
    // Source concept.
    if (0 == bytes_to_copy)
    {
      return -1; // Indicate end-of-sequence, per Source concept.
    }

    // Copy from the streambuf into the provided buffer.
    std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);

    // Update bytes remaining.
    bytes_remaining_ -= bytes_to_copy;

    // Consume from the streambuf.
    streambuf_.consume(bytes_to_copy);

    return bytes_to_copy;
  }

private:
  boost::asio::streambuf& streambuf_;
  std::streamsize bytes_remaining_;
};

// ...

// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(streambuf, n);
parse(input);

Вот полный пример, демонстрирующий этот подход:

#include <functional>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/iostreams/concepts.hpp>  // boost::iostreams::source
#include <boost/iostreams/stream.hpp>

/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
  : public boost::iostreams::source // Use convenience class.
{
public:

  explicit
  asio_streambuf_input_device(
      boost::asio::streambuf& streambuf,
      std::streamsize bytes_transferred
  )
    : streambuf_(streambuf),
      bytes_remaining_(bytes_transferred)
  {}

  std::streamsize read(char_type* buffer, std::streamsize buffer_size)
  {
    // Determine max amount of bytes to copy.
    auto bytes_to_copy =
      std::min(bytes_remaining_, std::min(
          static_cast<std::streamsize>(streambuf_.size()), buffer_size));

    // If there is no more data to be read, indicate end-of-sequence per
    // Source concept.
    if (0 == bytes_to_copy)
    {
      return -1; // Indicate end-of-sequence, per Source concept.
    }

    // Copy from the streambuf into the provided buffer.
    std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);

    // Update bytes remaining.
    bytes_remaining_ -= bytes_to_copy;

    // Consume from the streambuf.
    streambuf_.consume(bytes_to_copy);

    return bytes_to_copy;
  }

private:
  boost::asio::streambuf& streambuf_;
  std::streamsize bytes_remaining_;
};

/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
  return std::string(buffers_begin(streambuf.data()),
                     buffers_end(streambuf.data()));
}

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, std::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();

  // Write to client.
  const std::string message =
    "12@"
    "345@";
  write(server_socket, boost::asio::buffer(message));

  boost::asio::streambuf streambuf;

  {
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    // Verify that the entire message "12@345@" was read into
    // streambuf's input sequence.
    assert(message.size() == streambuf.size());
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Create a custom iostream that sets a limit on the amount of bytes
    // that will be read from the streambuf.
    boost::iostreams::stream<asio_streambuf_input_device> input(
      streambuf, bytes_transferred);

    int data = 0;
    input >> data; // Consumes "12" from input sequence.
    assert(data == 12);
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Consume "@" from input sequence.
    assert(!input.eof());
    input.get(); // No more data available.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
  }
  std::cout << "streambuf contains: " << make_string(streambuf) <<
               std::endl;

  {
    // As the streambuf's input sequence already contains the delimiter,
    // this operation will not actually attempt to read data from the
    // socket.
    auto bytes_transferred = read_until(client_socket, streambuf, '@');

    // Create a custom iostream that sets a limit on the amount of bytes
    // that will be read from the streambuf.
    boost::iostreams::stream<asio_streambuf_input_device> input(
      streambuf, bytes_transferred);

    std::string data;
    getline(input, data, '@'); // Consumes delimiter.
    assert(data == "345");
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
  }

  assert(streambuf.size() == 0);
  std::cout << "streambuf is empty" << std::endl;
}

Выход:

streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty

Вытекают из boost::asio::streambuf

Можно смело вывести из Boost.Asio's streambuf и реализовать пользовательское поведение. В этом случае целью является ограничение количества байтов istream может извлекать из входной последовательности, прежде чем вызывать переполнение. Это может быть достигнуто путем:

  • Обновление указателей get-area (входной последовательности) потокового буфера таким образом, чтобы он содержал только желаемое количество байтов для чтения. Это достигается установкой конца указателя get-area (egptr) быть n байт после указателя текущей области символа (gptr). В приведенном ниже коде я называю это кадрированием.
  • Обработка underflow(), Если достигнут конец текущего кадра, верните EOF,
/// @brief Type that derives from Boost.Asio streambuf and can frame the
///        input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
  : public boost::asio::basic_streambuf<Allocator>
{
private:

  typedef boost::asio::basic_streambuf<Allocator> parent_type;

public:

  explicit 
  basic_framed_streambuf(
    std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
    const Allocator& allocator = Allocator()
  )
    : parent_type(maximum_size, allocator),
      egptr_(nullptr)
  {}

  /// @brief Limit the current input sequence to n characters.
  ///
  /// @remark An active frame is invalidated by any member function that
  ///        modifies the input or output sequence.
  void frame(std::streamsize n)
  {
    // Store actual end of input sequence.
    egptr_ = this->egptr();
    // Set the input sequence end to n characters from the current
    // input sequence pointer..
    this->setg(this->eback(), this->gptr(), this->gptr() + n);
  }

  /// @brief Restore the end of the input sequence.
  void unframe()
  {
    // Restore the end of the input sequence.
    this->setg(this->eback(), this->gptr(), this->egptr_);
    egptr_ = nullptr;
  }

protected:

  // When the end of the input sequence has been reached, underflow
  // will be invoked.
  typename parent_type::int_type underflow()
  {
    // If the  streambuf is currently framed, then return eof
    // on underflow.  Otherwise, defer to the parent implementation.
    return egptr_ ? parent_type::traits_type::eof()
                  : parent_type::underflow();
  }

private:
  char* egptr_;
};

// ...

basic_framed_streambuf<> streambuf;
// ....
streambuf.frame(n);
std::istream input(&streambuf);
parse(input);
streambuf.unframe();

Вот полный пример, демонстрирующий этот подход:

#include <functional>
#include <iostream>
#include <string>

#include <boost/asio.hpp>

/// @brief Type that derives from Boost.Asio streambuf and can frame the
///        input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
  : public boost::asio::basic_streambuf<Allocator>
{
private:

  typedef boost::asio::basic_streambuf<Allocator> parent_type;

public:

  explicit 
  basic_framed_streambuf(
    std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
    const Allocator& allocator = Allocator()
  )
    : parent_type(maximum_size, allocator),
      egptr_(nullptr)
  {}

  /// @brief Limit the current input sequence to n characters.
  ///
  /// @remark An active frame is invalidated by any member function that
  ///        modifies the input or output sequence.
  void frame(std::streamsize n)
  {
    // Store actual end of input sequence.
    egptr_ = this->egptr();
    // Set the input sequence end to n characters from the current
    // input sequence pointer..
    this->setg(this->eback(), this->gptr(), this->gptr() + n);
  }

  /// @brief Restore the end of the input sequence.
  void unframe()
  {
    // Restore the end of the input sequence.
    this->setg(this->eback(), this->gptr(), this->egptr_);
    egptr_ = nullptr;
  }

protected:

  // When the end of the input sequence has been reached, underflow
  // will be invoked.
  typename parent_type::int_type underflow()
  {
    // If the  streambuf is currently framed, then return eof
    // on underflow.  Otherwise, defer to the parent implementation.
    return egptr_ ? parent_type::traits_type::eof()
                  : parent_type::underflow();
  }

private:
  char* egptr_;
};

typedef basic_framed_streambuf<> framed_streambuf;

/// @brief RAII type that helps frame a basic_framed_streambuf within a 
///        given scope.
template <typename Streambuf>
class streambuf_frame
{
public:
  explicit streambuf_frame(Streambuf& streambuf, std::streamsize n)
    : streambuf_(streambuf)
  {
    streambuf_.frame(n);
  }

  ~streambuf_frame() { streambuf_.unframe(); }

  streambuf_frame(const streambuf_frame&) = delete;
  streambuf_frame& operator=(const streambuf_frame&) = delete;

private:
  Streambuf& streambuf_;
};

/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
  return std::string(buffers_begin(streambuf.data()),
                     buffers_end(streambuf.data()));
}

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, std::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();

  // Write to client.
  const std::string message =
    "12@"
    "345@";
  write(server_socket, boost::asio::buffer(message));

  framed_streambuf streambuf;

  // Demonstrate framing the streambuf's input sequence manually.
  {
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    // Verify that the entire message "12@345@" was read into
    // streambuf's input sequence.
    assert(message.size() == streambuf.size());
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Frame the streambuf based on bytes_transferred.  This is all data
    // up to and including the first delimiter.
    streambuf.frame(bytes_transferred);

    // Use an istream to read data from the currently framed streambuf.
    std::istream input(&streambuf);
    int data = 0;
    input >> data; // Consumes "12" from input sequence.
    assert(data == 12);
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Consume "@" from input sequence.
    assert(!input.eof());
    input.get(); // No more data available in the frame, so underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;

    // Restore the streambuf.
    streambuf.unframe();
  }

  // Demonstrate using an RAII helper to frame the streambuf's input
  // sequence.
  {
    // As the streambuf's input sequence already contains the delimiter,
    // this operation will not actually attempt to read data from the
    // socket.
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Frame the streambuf based on bytes_transferred.  This is all data
    // up to and including the first delimiter.  Use a frame RAII object
    // to only frame the streambuf within the current scope.
    streambuf_frame<framed_streambuf> frame(streambuf, bytes_transferred);

    // Use an istream to read data from the currently framed streambuf.
    std::istream input(&streambuf);
    std::string data;
    getline(input, data, '@'); // Consumes delimiter.
    assert(data == "345");
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // No more data available in the frame, so underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
    // The frame object's destructor will unframe the streambuf.
  }

  assert(streambuf.size() == 0);
  std::cout << "streambuf is empty" << std::endl;
}

Выход:

streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty

За исключением случая, когда вы используете поток после чтения до тех пор, пока соединение не будет закрыто, я не вижу смысла в strbuf + istream, как это, действительно.

Простая проблема заключается в том, что извлечение istream не будет обновлять поток атомарно при неудачном / частичном разборе, что приведет к потере ввода / повреждения.

Вот ваш пример макета, исправленный для правильного ожидания и получения символов NUL:

Жить на Колиру

#include <iostream>
#include <utility>
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/qi_match.hpp>

namespace asio = boost::asio;

std::istream &parseDocument(std::istream &is, int &data) {
    namespace qi = boost::spirit::qi;
    return is >> qi::match(qi::int_ >> '\0', data);
}

template <typename MutableBuffers> size_t 
    fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "\0" "23", 8)); }

template <typename MutableBuffers> size_t 
    fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "\0", 4)); }

#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
    auto buffers = strbuf.prepare(1024); \
    size_t read = fake_read(buffers); \
    std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
    strbuf.commit(read); \
}

int main() {
    // this is the easy scenario:
    {
        asio::streambuf strbuf;

        READ_UNTIL(strbuf, fake_read1);
        READ_UNTIL(strbuf, fake_read2);

        int data1, data2;
        std::istream stream(&strbuf);

        parseDocument(stream, data1);
        parseDocument(stream, data2);

        std::cout << "Yo: " << data1 << "\n";
        std::cout << "Yo: " << data2 << "\n";
    }

    // this is the tricky scenario:
    {
        asio::streambuf strbuf;

        READ_UNTIL(strbuf, fake_read1);
        //READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame

        int data1, data2;
        std::istream stream(&strbuf);

        parseDocument(stream, data1);

        while (!parseDocument(stream, data2)) {
            stream.clear();
            READ_UNTIL(strbuf, fake_read2);
        }

        std::cout << "Oops: " << data1 << "\n";
        std::cout << "Oops: " << data2 << "\n";
    }
}

В "сложном" сценарии вы можете видеть, что частичный пакет (содержащий "23") потерян, а последующий пакет поврежден:

READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Oops: 12345
Oops: 456

Вы также можете видеть, что я переключаюсь на предпочитаемую платформу для небольших специальных анализаторов: Boost Spirit в parseDocument() функция. Смотрите ниже, как я могу сделать это более применимым.

1. Недостающие потоковые буферы

Вместо этого вы могли бы искать реализацию потокового буфера, которая просто ожидала бы больше данных, когда буфер переполнен.

Я считаю, что, например, asio::ip::tcp::iostream это именно то:

Жить на Колиру

#include <iostream>
#include <boost/asio.hpp>

int main() {
    std::cout << boost::asio::ip::tcp::iostream("127.0.0.1", "6769").rdbuf();
}

Запустите это локально, чтобы увидеть, что ввод поступает по пакетам (например, с netcat)

2. Разбор базового ConstBuffers последовательность

В качестве альтернативы, и в духе нулевого копирования вы можете захотеть проанализировать непосредственно в базовой последовательности буфера, которая лежит в основе asio::streambuf реализация, убедившись, что только consume() что вы успешно проанализировали:

Жить на Колиру

#include <iostream>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>

using namespace std;
namespace asio = boost::asio;

using asio::buffers_begin;
using asio::buffers_end;

template <typename ConstBuffers>
size_t parseDocument(ConstBuffers const& buffers, int &data) {

    auto b(buffers_begin(buffers)), f=b, l(buffers_end(buffers));

    namespace qi = boost::spirit::qi;
    return qi::phrase_parse(f, l, qi::int_ >> '\0', qi::space, data)
        ? (f - b) 
        : 0; // only optionally consume
}

template <typename MutableBuffers> size_t 
    fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "\0" "23", 8)); }

template <typename MutableBuffers> size_t 
    fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "\0", 4)); }

#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
    auto buffers = strbuf.prepare(1024); \
    size_t read = fake_read(buffers); \
    std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
    strbuf.commit(read); \
}

size_t readuntil2(boost::asio::streambuf &strbuf) {

    std::cout << __PRETTY_FUNCTION__ << "\n";
    static int delay_fake_async_receive = 6;
    if (delay_fake_async_receive--)
        return 0;

    auto buffers = strbuf.prepare(1024);
    size_t read = fake_read2(buffers);
    std::cout << "read2: " << read << " bytes\n";
    strbuf.commit(read);
    return read;
}

#include <boost/range/algorithm.hpp>

int main() {
    // this is the tricky scenario:
    asio::streambuf strbuf;

    READ_UNTIL(strbuf, fake_read1);
    //READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame

    int data1=0, data2=0;

    strbuf.consume(parseDocument(strbuf.data(), data1));

    size_t consumed = 0;
    while (!(consumed = parseDocument(strbuf.data(), data2))) {
        READ_UNTIL(strbuf, fake_read2);
    }

    std::cout << "Yay: " << data1 << "\n";
    std::cout << "Yay: " << data2 << "\n";

    //asio::ip::tcp::iostream networkstream("localhost", "6767");
    std::cout << asio::ip::tcp::iostream("localhost", "6767").rdbuf();
}

Печать

READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Yay: 12345
Yay: 23456

РЕЗЮМЕ, Интеграция сторонних парсеров

Если вы должны использовать стороннюю библиотеку, которая требует std::istream& для анализа, но вы не можете полагаться на то, что передачи будут выровнены с границами кадра, возможно, вы могли бы использовать гибридный подход:

auto n = find_frame_boundary(buffers_begin(sb.data()), buffers_end(sb.data()));

а потом возможно использовать boost::iostream::array_source на уменьшенной области обнаружено.

Другие вопросы по тегам