Можно ли читать из сокета синхронно, используя Boost.Asio с тайм-аутом в многопоточной службе ввода-вывода?
У меня есть приложение, которое использует Boost.Asio для TCP и UDP сокетов. Я понимаю, что "А" в "Asio" означает " Асинхронный", поэтому библиотека стремится поощрять использование асинхронного ввода-вывода, когда это возможно. У меня есть несколько случаев, когда синхронное чтение сокетов предпочтительнее. В то же время, однако, я хотел бы установить тайм-аут для указанных вызовов, так что нет возможности блокировать чтение бесконечно.
Похоже, что это довольно распространенная проблема среди пользователей Boost.Asio со следующими прошлыми вопросами переполнения стека по этой теме:
- C++ Boost ASIO: как читать / писать с таймаутом?
- asio:: читать с таймаутом
- увеличить тайм-аут asio
- Как установить таймаут на блокировку сокетов в boost asio?
Там может быть даже больше. В документации есть даже примеры того, как реализовать синхронные операции с таймаутами. Они сводятся к преобразованию синхронной операции в асинхронную, а затем ее запуску параллельно с asio::deadline_timer
, Затем обработчик истечения таймера может отменить асинхронное чтение в случае истечения времени ожидания. Это выглядит примерно так (фрагмент взят из приведенного выше примера):
std::size_t receive(const boost::asio::mutable_buffer& buffer,
boost::posix_time::time_duration timeout, boost::system::error_code& ec)
{
// Set a deadline for the asynchronous operation.
deadline_.expires_from_now(timeout);
// Set up the variables that receive the result of the asynchronous
// operation. The error code is set to would_block to signal that the
// operation is incomplete. Asio guarantees that its asynchronous
// operations will never fail with would_block, so any other value in
// ec indicates completion.
ec = boost::asio::error::would_block;
std::size_t length = 0;
// Start the asynchronous operation itself. The handle_receive function
// used as a callback will update the ec and length variables.
socket_.async_receive(boost::asio::buffer(buffer),
boost::bind(&client::handle_receive, _1, _2, &ec, &length));
// Block until the asynchronous operation has completed.
do io_service_.run_one(); while (ec == boost::asio::error::would_block);
return length;
}
На самом деле это относительно чистое решение: запустить асинхронные операции, а затем вручную опросить asio::io_service
выполнять асинхронные обработчики по одному, пока либо async_receive()
завершается или таймер истекает.
Однако как насчет случая, когда базовая служба ввода-вывода сокета уже запущена в одном или нескольких фоновых потоках? В этом случае нет гарантии, что обработчики для асинхронных операций будут выполняться потоком переднего плана в приведенном выше фрагменте, поэтому run_one()
не вернется, пока не выполнится какой-то более поздний, возможно не связанный, обработчик. Это сделало бы чтение сокета довольно безразличным.
asio::io_service
имеет poll_one()
функция, которая будет проверять очередь службы без блокировки, но я не вижу хорошего способа блокировать поток переднего плана (эмулируя поведение синхронного вызова), пока не выполнится обработчик, за исключением случая, когда нет фоновых потоков, выполняющих asio::io_service::run()
уже.
Я вижу два возможных решения, ни одно из которых мне не нравится:
Используйте условную переменную или аналогичную конструкцию, чтобы сделать блок потока переднего плана после запуска асинхронных операций. В обработчике для
async_receive()
вызов, сигнал условной переменной, чтобы разблокировать поток. Это вызывает некоторую блокировку для каждого чтения, чего я хотел бы избежать, поскольку я хотел бы достичь максимально возможной пропускной способности при чтении через сокет UDP. В противном случае, это жизнеспособно, и, вероятно, это то, что я бы сделал, если не представлен лучший метод.Убедитесь, что у сокета есть свой
asio::io_service
это не выполняется никакими фоновыми потоками. Это усложняет использование асинхронного ввода-вывода с сокетом в тех случаях, когда это необходимо.
Какие-нибудь идеи для других способов сделать это безопасным способом?
В сторону: Есть несколько ответов на предыдущие вопросы SO, которые рекомендуют использовать SO_RCVTIMEO
опция сокета для реализации тайм-аута чтения сокета. В теории это звучит замечательно, но, похоже, это не работает на моей платформе (Ubuntu 12.04, Boost v1.55). Я могу установить время ожидания сокета, но это не даст желаемого эффекта с Asio. Соответствующий код находится в /boost/asio/detail/impl/socket_ops.ipp
:
size_t sync_recvfrom(socket_type s, state_type state, buf* bufs,
size_t count, int flags, socket_addr_type* addr,
std::size_t* addrlen, boost::system::error_code& ec)
{
if (s == invalid_socket)
{
ec = boost::asio::error::bad_descriptor;
return 0;
}
// Read some data.
for (;;)
{
// Try to complete the operation without blocking.
signed_size_type bytes = socket_ops::recvfrom(
s, bufs, count, flags, addr, addrlen, ec);
// Check if operation succeeded.
if (bytes >= 0)
return bytes;
// Operation failed.
if ((state & user_set_non_blocking)
|| (ec != boost::asio::error::would_block
&& ec != boost::asio::error::try_again))
return 0;
// Wait for socket to become ready.
if (socket_ops::poll_read(s, 0, ec) < 0)
return 0;
}
}
Если время чтения сокета истекло, вызов recvfrom()
выше вернется EAGAIN
или же EWOULDBLOCK
, которые переводятся на boost::asio::error::try_again
или же boost::asio::error::would_block
, В этом случае приведенный выше код вызовет poll_read()
функция, которая для моей платформы выглядит так:
int poll_read(socket_type s, state_type state, boost::system::error_code& ec)
{
if (s == invalid_socket)
{
ec = boost::asio::error::bad_descriptor;
return socket_error_retval;
}
pollfd fds;
fds.fd = s;
fds.events = POLLIN;
fds.revents = 0;
int timeout = (state & user_set_non_blocking) ? 0 : -1;
clear_last_error();
int result = error_wrapper(::poll(&fds, 1, timeout), ec);
if (result == 0)
ec = (state & user_set_non_blocking)
? boost::asio::error::would_block : boost::system::error_code();
else if (result > 0)
ec = boost::system::error_code();
return result;
}
Я выхватил код, условно скомпилированный для других платформ. Как вы можете видеть, если сокет не является неблокирующим, он завершает вызов poll()
с бесконечным тайм-аутом, поэтому блокируется до тех пор, пока сокет не получит данные для чтения (и предотвращает попытку тайм-аута). Таким образом SO_RCVTIMEO
вариант не эффективен.
1 ответ
Поддержка Boost.Asio для futures
может предоставить элегантное решение. Когда асинхронная операция предоставляется boost::asio::use_future
значение в качестве обработчика завершения, инициирующая функция вернет std::future
объект, который получит результат операции. Кроме того, если операция завершается с ошибкой, error_code
превращается в system_error
и передается звонящему через future
,
В примере клиента Datytime Boost.Asio C++11 Futures выделенный поток запускает io_service
и основной поток инициирует асинхронные операции, а затем синхронно ожидает завершения операций, как показано ниже:
std::array<char, 128> recv_buf;
udp::endpoint sender_endpoint;
std::future<std::size_t> recv_length =
socket.async_receive_from(
boost::asio::buffer(recv_buf),
sender_endpoint,
boost::asio::use_future);
// Do other things here while the receive completes.
std::cout.write(
recv_buf.data(),
recv_length.get()); // Blocks until receive is complete.
Когда используешь future
s, общий подход к реализации синхронного чтения с тайм-аутом такой же, как и раньше. Вместо синхронного чтения можно использовать асинхронное чтение и асинхронное ожидание по таймеру. Единственное незначительное изменение заключается в том, что вместо того, чтобы блокировать io_service
или периодически проверяя предикат, можно вызвать future::get()
блокировать до тех пор, пока операция не будет завершена с успехом или неудачей (например, тайм-аут).
Если C++ 11 недоступен, то тип возврата для асинхронных операций может быть настроен для Boost. future
, как показано в этом ответе.