Как бороться с дополнительными символами, читаемыми в ASIO streambuf?
В большинстве библиотек парсеры работают только над std::istream
или один непрерывный буфер. Эти парсеры читают istream до конца, а не до конца документа. Даже если есть хороший boost::asio::streambuf
, которые могут быть использованы с istream
Существует проблема с чтением и фиксацией только одного кадра. Функции как read_until
передают все, что они читают, и если они читают фрагмент следующего кадра, заполнение при разборе завершается неудачно.
Этот насмешливый пример на Колиру показывает проблему.
Предполагая, что нам нужно эффективное решение без копирования буферов, я должен убедиться, что конец потока является правильным концом документа. Мое текущее решение состоит в том, чтобы сканировать данные и умножать фиксацию / потребление в течение одного подготовленного буфера:
size_t read_some_frames( boost::asio::streambuf& strbuf,
std::function< void(istream&) > parser ) {
auto buffers= strbuf.prepare( 1024 );
size_t read= bad_case_of_read_some( buffers );
vector< std::pair< size_t, size_t > > frames;
std::pair< size_t, size_t > leftover= scanForFrames(
buffers_begin(buffers),
buffers_begin(buffers)+read,
frames, '\0' );
for( auto const& frame: frames ) {
cout << "Frame size: " << frame.first
<< " skip: " << frame.second << endl;
strbuf.commit( frame.first );
strbuf.consume( frame.second );
iostream stream( &strbuf );
parser( stream );
}
cout << "Unfinished frame size: " << leftover.first
<< " skip:" << leftover.second << endl;
strbuf.commit( leftover.first );
strbuf.consume( leftover.second );
return read;
}
Согласно документации, это неправильно. Я думаю, что этот код работает, потому что этот вызов commit и потребление не освобождает внутренние буферы. Как-то мне нужно разобраться с этим.
Каковы возможные решения?
2 ответа
В то время как read_until()
операции фиксируют все данные, прочитанные во входную последовательность streambuf, они возвращают bytes_transferred
значение, содержащее количество байтов до и включая первый разделитель. По сути, это обеспечивает размер кадра, и можно ограничить istream
читать только часть streambuf
входная последовательность:
- Использование пользовательских
istream
это ограничивает количество байтов, считываемых из потокового буфера. Один из самых простых способов сделать это - использовать Boost.IOStream'sboost::iostreams::stream
и реализовать модель концепции источника. - Создать кастом
streambuf
что происходит от Boost.Asio'sstreambuf
, Чтобы ограничить количество байтов, считываемых из доступной входной последовательности, пользовательские функции должны будут манипулировать концом входной последовательности. Кроме того, обычайstreambuf
нужно будет справиться с недостаточным объемом.
изготовленный на заказ Source
для Boost.IOStream
Boost.IOStream-х boost::iostreams::stream
объект делегирует операции ввода-вывода на устройство. Устройство - это пользовательский код, который реализует модель различных концепций Boost.IOStream. В этом случае концепция Source, которая обеспечивает доступ для чтения к последовательности символов, является единственной необходимой. Кроме того, когда boost::iostreams::stream
использует исходное устройство, оно будет наследоваться от std::basic_istream
,
В следующем коде asio_streambuf_input_device
является моделью концепции Source, которая читает из потокового буфера Boost.Asio. Когда определенное количество байтов было прочитано, asio_streambuf_input_device
указывает на переполнение, даже если базовый потоковый буфер все еще содержит данные во входной последовательности.
/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
: public boost::iostreams::source // Use convenience class.
{
public:
explicit
asio_streambuf_input_device(
boost::asio::streambuf& streambuf,
std::streamsize bytes_transferred
)
: streambuf_(streambuf),
bytes_remaining_(bytes_transferred)
{}
std::streamsize read(char_type* buffer, std::streamsize buffer_size)
{
// Determine max amount of bytes to copy.
auto bytes_to_copy =
std::min(bytes_remaining_, std::min(
static_cast<std::streamsize>(streambuf_.size()), buffer_size));
// If there is no more data to be read, indicate end-of-sequence per
// Source concept.
if (0 == bytes_to_copy)
{
return -1; // Indicate end-of-sequence, per Source concept.
}
// Copy from the streambuf into the provided buffer.
std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);
// Update bytes remaining.
bytes_remaining_ -= bytes_to_copy;
// Consume from the streambuf.
streambuf_.consume(bytes_to_copy);
return bytes_to_copy;
}
private:
boost::asio::streambuf& streambuf_;
std::streamsize bytes_remaining_;
};
// ...
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(streambuf, n);
parse(input);
Вот полный пример, демонстрирующий этот подход:
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/iostreams/concepts.hpp> // boost::iostreams::source
#include <boost/iostreams/stream.hpp>
/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
: public boost::iostreams::source // Use convenience class.
{
public:
explicit
asio_streambuf_input_device(
boost::asio::streambuf& streambuf,
std::streamsize bytes_transferred
)
: streambuf_(streambuf),
bytes_remaining_(bytes_transferred)
{}
std::streamsize read(char_type* buffer, std::streamsize buffer_size)
{
// Determine max amount of bytes to copy.
auto bytes_to_copy =
std::min(bytes_remaining_, std::min(
static_cast<std::streamsize>(streambuf_.size()), buffer_size));
// If there is no more data to be read, indicate end-of-sequence per
// Source concept.
if (0 == bytes_to_copy)
{
return -1; // Indicate end-of-sequence, per Source concept.
}
// Copy from the streambuf into the provided buffer.
std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);
// Update bytes remaining.
bytes_remaining_ -= bytes_to_copy;
// Consume from the streambuf.
streambuf_.consume(bytes_to_copy);
return bytes_to_copy;
}
private:
boost::asio::streambuf& streambuf_;
std::streamsize bytes_remaining_;
};
/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
return std::string(buffers_begin(streambuf.data()),
buffers_end(streambuf.data()));
}
// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}
int main()
{
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
// Create all I/O objects.
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket server_socket(io_service);
tcp::socket client_socket(io_service);
// Connect client and server sockets.
acceptor.async_accept(server_socket, std::bind(&noop));
client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
io_service.run();
// Write to client.
const std::string message =
"12@"
"345@";
write(server_socket, boost::asio::buffer(message));
boost::asio::streambuf streambuf;
{
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Verify that the entire message "12@345@" was read into
// streambuf's input sequence.
assert(message.size() == streambuf.size());
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(
streambuf, bytes_transferred);
int data = 0;
input >> data; // Consumes "12" from input sequence.
assert(data == 12);
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Consume "@" from input sequence.
assert(!input.eof());
input.get(); // No more data available.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
}
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
{
// As the streambuf's input sequence already contains the delimiter,
// this operation will not actually attempt to read data from the
// socket.
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(
streambuf, bytes_transferred);
std::string data;
getline(input, data, '@'); // Consumes delimiter.
assert(data == "345");
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
}
assert(streambuf.size() == 0);
std::cout << "streambuf is empty" << std::endl;
}
Выход:
streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
Вытекают из boost::asio::streambuf
Можно смело вывести из Boost.Asio's streambuf
и реализовать пользовательское поведение. В этом случае целью является ограничение количества байтов istream
может извлекать из входной последовательности, прежде чем вызывать переполнение. Это может быть достигнуто путем:
- Обновление указателей get-area (входной последовательности) потокового буфера таким образом, чтобы он содержал только желаемое количество байтов для чтения. Это достигается установкой конца указателя get-area (
egptr
) бытьn
байт после указателя текущей области символа (gptr
). В приведенном ниже коде я называю это кадрированием. - Обработка
underflow()
, Если достигнут конец текущего кадра, вернитеEOF
,
/// @brief Type that derives from Boost.Asio streambuf and can frame the
/// input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
: public boost::asio::basic_streambuf<Allocator>
{
private:
typedef boost::asio::basic_streambuf<Allocator> parent_type;
public:
explicit
basic_framed_streambuf(
std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
const Allocator& allocator = Allocator()
)
: parent_type(maximum_size, allocator),
egptr_(nullptr)
{}
/// @brief Limit the current input sequence to n characters.
///
/// @remark An active frame is invalidated by any member function that
/// modifies the input or output sequence.
void frame(std::streamsize n)
{
// Store actual end of input sequence.
egptr_ = this->egptr();
// Set the input sequence end to n characters from the current
// input sequence pointer..
this->setg(this->eback(), this->gptr(), this->gptr() + n);
}
/// @brief Restore the end of the input sequence.
void unframe()
{
// Restore the end of the input sequence.
this->setg(this->eback(), this->gptr(), this->egptr_);
egptr_ = nullptr;
}
protected:
// When the end of the input sequence has been reached, underflow
// will be invoked.
typename parent_type::int_type underflow()
{
// If the streambuf is currently framed, then return eof
// on underflow. Otherwise, defer to the parent implementation.
return egptr_ ? parent_type::traits_type::eof()
: parent_type::underflow();
}
private:
char* egptr_;
};
// ...
basic_framed_streambuf<> streambuf;
// ....
streambuf.frame(n);
std::istream input(&streambuf);
parse(input);
streambuf.unframe();
Вот полный пример, демонстрирующий этот подход:
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
/// @brief Type that derives from Boost.Asio streambuf and can frame the
/// input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
: public boost::asio::basic_streambuf<Allocator>
{
private:
typedef boost::asio::basic_streambuf<Allocator> parent_type;
public:
explicit
basic_framed_streambuf(
std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
const Allocator& allocator = Allocator()
)
: parent_type(maximum_size, allocator),
egptr_(nullptr)
{}
/// @brief Limit the current input sequence to n characters.
///
/// @remark An active frame is invalidated by any member function that
/// modifies the input or output sequence.
void frame(std::streamsize n)
{
// Store actual end of input sequence.
egptr_ = this->egptr();
// Set the input sequence end to n characters from the current
// input sequence pointer..
this->setg(this->eback(), this->gptr(), this->gptr() + n);
}
/// @brief Restore the end of the input sequence.
void unframe()
{
// Restore the end of the input sequence.
this->setg(this->eback(), this->gptr(), this->egptr_);
egptr_ = nullptr;
}
protected:
// When the end of the input sequence has been reached, underflow
// will be invoked.
typename parent_type::int_type underflow()
{
// If the streambuf is currently framed, then return eof
// on underflow. Otherwise, defer to the parent implementation.
return egptr_ ? parent_type::traits_type::eof()
: parent_type::underflow();
}
private:
char* egptr_;
};
typedef basic_framed_streambuf<> framed_streambuf;
/// @brief RAII type that helps frame a basic_framed_streambuf within a
/// given scope.
template <typename Streambuf>
class streambuf_frame
{
public:
explicit streambuf_frame(Streambuf& streambuf, std::streamsize n)
: streambuf_(streambuf)
{
streambuf_.frame(n);
}
~streambuf_frame() { streambuf_.unframe(); }
streambuf_frame(const streambuf_frame&) = delete;
streambuf_frame& operator=(const streambuf_frame&) = delete;
private:
Streambuf& streambuf_;
};
/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
return std::string(buffers_begin(streambuf.data()),
buffers_end(streambuf.data()));
}
// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}
int main()
{
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
// Create all I/O objects.
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket server_socket(io_service);
tcp::socket client_socket(io_service);
// Connect client and server sockets.
acceptor.async_accept(server_socket, std::bind(&noop));
client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
io_service.run();
// Write to client.
const std::string message =
"12@"
"345@";
write(server_socket, boost::asio::buffer(message));
framed_streambuf streambuf;
// Demonstrate framing the streambuf's input sequence manually.
{
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Verify that the entire message "12@345@" was read into
// streambuf's input sequence.
assert(message.size() == streambuf.size());
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Frame the streambuf based on bytes_transferred. This is all data
// up to and including the first delimiter.
streambuf.frame(bytes_transferred);
// Use an istream to read data from the currently framed streambuf.
std::istream input(&streambuf);
int data = 0;
input >> data; // Consumes "12" from input sequence.
assert(data == 12);
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Consume "@" from input sequence.
assert(!input.eof());
input.get(); // No more data available in the frame, so underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
// Restore the streambuf.
streambuf.unframe();
}
// Demonstrate using an RAII helper to frame the streambuf's input
// sequence.
{
// As the streambuf's input sequence already contains the delimiter,
// this operation will not actually attempt to read data from the
// socket.
auto bytes_transferred = read_until(client_socket, streambuf, '@');
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Frame the streambuf based on bytes_transferred. This is all data
// up to and including the first delimiter. Use a frame RAII object
// to only frame the streambuf within the current scope.
streambuf_frame<framed_streambuf> frame(streambuf, bytes_transferred);
// Use an istream to read data from the currently framed streambuf.
std::istream input(&streambuf);
std::string data;
getline(input, data, '@'); // Consumes delimiter.
assert(data == "345");
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // No more data available in the frame, so underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
// The frame object's destructor will unframe the streambuf.
}
assert(streambuf.size() == 0);
std::cout << "streambuf is empty" << std::endl;
}
Выход:
streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
За исключением случая, когда вы используете поток после чтения до тех пор, пока соединение не будет закрыто, я не вижу смысла в strbuf + istream, как это, действительно.
Простая проблема заключается в том, что извлечение istream не будет обновлять поток атомарно при неудачном / частичном разборе, что приведет к потере ввода / повреждения.
Вот ваш пример макета, исправленный для правильного ожидания и получения символов NUL:
#include <iostream>
#include <utility>
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/qi_match.hpp>
namespace asio = boost::asio;
std::istream &parseDocument(std::istream &is, int &data) {
namespace qi = boost::spirit::qi;
return is >> qi::match(qi::int_ >> '\0', data);
}
template <typename MutableBuffers> size_t
fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "\0" "23", 8)); }
template <typename MutableBuffers> size_t
fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "\0", 4)); }
#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
auto buffers = strbuf.prepare(1024); \
size_t read = fake_read(buffers); \
std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
strbuf.commit(read); \
}
int main() {
// this is the easy scenario:
{
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
READ_UNTIL(strbuf, fake_read2);
int data1, data2;
std::istream stream(&strbuf);
parseDocument(stream, data1);
parseDocument(stream, data2);
std::cout << "Yo: " << data1 << "\n";
std::cout << "Yo: " << data2 << "\n";
}
// this is the tricky scenario:
{
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
//READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame
int data1, data2;
std::istream stream(&strbuf);
parseDocument(stream, data1);
while (!parseDocument(stream, data2)) {
stream.clear();
READ_UNTIL(strbuf, fake_read2);
}
std::cout << "Oops: " << data1 << "\n";
std::cout << "Oops: " << data2 << "\n";
}
}
В "сложном" сценарии вы можете видеть, что частичный пакет (содержащий "23") потерян, а последующий пакет поврежден:
READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Oops: 12345
Oops: 456
Вы также можете видеть, что я переключаюсь на предпочитаемую платформу для небольших специальных анализаторов: Boost Spirit в parseDocument()
функция. Смотрите ниже, как я могу сделать это более применимым.
1. Недостающие потоковые буферы
Вместо этого вы могли бы искать реализацию потокового буфера, которая просто ожидала бы больше данных, когда буфер переполнен.
Я считаю, что, например, asio::ip::tcp::iostream
это именно то:
#include <iostream>
#include <boost/asio.hpp>
int main() {
std::cout << boost::asio::ip::tcp::iostream("127.0.0.1", "6769").rdbuf();
}
Запустите это локально, чтобы увидеть, что ввод поступает по пакетам (например, с netcat)
2. Разбор базового ConstBuffers
последовательность
В качестве альтернативы, и в духе нулевого копирования вы можете захотеть проанализировать непосредственно в базовой последовательности буфера, которая лежит в основе asio::streambuf
реализация, убедившись, что только consume()
что вы успешно проанализировали:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
using namespace std;
namespace asio = boost::asio;
using asio::buffers_begin;
using asio::buffers_end;
template <typename ConstBuffers>
size_t parseDocument(ConstBuffers const& buffers, int &data) {
auto b(buffers_begin(buffers)), f=b, l(buffers_end(buffers));
namespace qi = boost::spirit::qi;
return qi::phrase_parse(f, l, qi::int_ >> '\0', qi::space, data)
? (f - b)
: 0; // only optionally consume
}
template <typename MutableBuffers> size_t
fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "\0" "23", 8)); }
template <typename MutableBuffers> size_t
fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "\0", 4)); }
#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
auto buffers = strbuf.prepare(1024); \
size_t read = fake_read(buffers); \
std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
strbuf.commit(read); \
}
size_t readuntil2(boost::asio::streambuf &strbuf) {
std::cout << __PRETTY_FUNCTION__ << "\n";
static int delay_fake_async_receive = 6;
if (delay_fake_async_receive--)
return 0;
auto buffers = strbuf.prepare(1024);
size_t read = fake_read2(buffers);
std::cout << "read2: " << read << " bytes\n";
strbuf.commit(read);
return read;
}
#include <boost/range/algorithm.hpp>
int main() {
// this is the tricky scenario:
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
//READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame
int data1=0, data2=0;
strbuf.consume(parseDocument(strbuf.data(), data1));
size_t consumed = 0;
while (!(consumed = parseDocument(strbuf.data(), data2))) {
READ_UNTIL(strbuf, fake_read2);
}
std::cout << "Yay: " << data1 << "\n";
std::cout << "Yay: " << data2 << "\n";
//asio::ip::tcp::iostream networkstream("localhost", "6767");
std::cout << asio::ip::tcp::iostream("localhost", "6767").rdbuf();
}
Печать
READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Yay: 12345
Yay: 23456
РЕЗЮМЕ, Интеграция сторонних парсеров
Если вы должны использовать стороннюю библиотеку, которая требует std::istream&
для анализа, но вы не можете полагаться на то, что передачи будут выровнены с границами кадра, возможно, вы могли бы использовать гибридный подход:
auto n = find_frame_boundary(buffers_begin(sb.data()), buffers_end(sb.data()));
а потом возможно использовать boost::iostream::array_source
на уменьшенной области обнаружено.