Соответствие обратных вызовов boost::deadline_timer к соответствующему wait_async

Рассмотрим этот короткий фрагмент кода, где один boost::deadline_timer прерывает другой:

#include <iostream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/asio.hpp>

static boost::asio::io_service io;
boost::asio::deadline_timer timer1(io);
boost::asio::deadline_timer timer2(io);

static void timer1_handler1(const boost::system::error_code& error)
{
    std::cout << __PRETTY_FUNCTION__ << " time:" << time(0) << " error:" << error.message() << " expect:Operation canceled." << std::endl;        
}        

static void timer1_handler2(const boost::system::error_code& error)
{
    std::cout << __PRETTY_FUNCTION__ << " time:" << time(0) << " error:" << error.message() << " expect:success." << std::endl;        
}        

static void timer2_handler1(const boost::system::error_code& error)
{
    std::cout << __PRETTY_FUNCTION__ << " time:" << time(0) << " error:" << error.message() << " expect:success." << std::endl;        
    std::cout << "cancel and restart timer1. Bind to timer1_handler2" << std::endl;
    timer1.cancel();
    timer1.expires_from_now(boost::posix_time::milliseconds(10000));
    timer1.async_wait(boost::bind(timer1_handler2, boost::asio::placeholders::error));        
}        

int main()
{
    std::cout << "Start timer1. Bind to timer1_handler1." << std::endl;
    timer1.expires_from_now(boost::posix_time::milliseconds(2000));
    timer1.async_wait(boost::bind(timer1_handler1, boost::asio::placeholders::error));        

    std::cout << "Start timer2. Bind to timer2_handler1. Will interrupt timer1." << std::endl;
    timer2.expires_from_now(boost::posix_time::milliseconds(2000));
    timer2.async_wait(boost::bind(timer2_handler1, boost::asio::placeholders::error));        

    std::cout << "Run the boost io service." << std::endl;
    io.run();

    return 0;
}

Если время для timer2 варьируется в пределах отметки 2 секунды, иногда timer1_handler1 сообщает об успехе, а иногда операция отменяется. Это, вероятно, определено в тривиальном примере, потому что мы знаем, на какое время timer2 установлено.

./timer1
Start timer1. Bind to timer1_handler1.
Start timer2. Bind to timer2_handler1. Will interrupt timer1.
Run the boost io service.
void timer1_handler1(const boost::system::error_code&) time:1412680360 error:Success expect:Operation canceled.
void timer2_handler1(const boost::system::error_code&) time:1412680360 error:Success expect:success.
cancel and restart timer1. Bind to timer1_handler2
void timer1_handler2(const boost::system::error_code&) time:1412680370 error:Success expect:success.

Это представляет собой более сложную систему, где timer1 реализует тайм-аут, а timer2 действительно является асинхронным сокетом. Иногда я наблюдал сценарий, когда timer1 отменяется слишком поздно, и первый обработчик возвращается после вызова второго async_wait(), таким образом давая ложный тайм-аут.

Ясно, что мне нужно сопоставить обратные вызовы обработчика с соответствующим вызовом async_wait(). Есть ли удобный способ сделать это?

2 ответа

Решение

Вы можете boost::bind дополнительные параметры для обработчика завершения, которые можно использовать для идентификации источника.

Одним из удобных способов решения поставленной проблемы, управления высокоуровневыми асинхронными операциями, состоящими из нескольких несвязанных асинхронных операций, является использование подхода, использованного в официальном примере таймаута Boost. Внутри него обработчики принимают решения, изучая текущее состояние, а не связывая логику обработчика с ожидаемым или предоставленным состоянием.

Перед началом работы над решением важно выявить все возможные случаи выполнения обработчика. Когда io_service выполняется одна итерация цикла событий, которая выполнит все операции, которые готовы к выполнению, а по завершении операции обработчик завершения пользователя будет поставлен в очередь с error_code с указанием статуса операции. io_service затем вызовет обработчики завершения из очереди. Следовательно, в одной итерации все готовые к выполнению операции выполняются в неуказанном порядке до обработчиков завершения, а порядок, в котором вызываются обработчики завершения, не определен. Например, при составлении async_read_with_timeout() операция от async_read() а также async_wait()где любая операция отменяется только в обработчике завершения другой операции, возможен следующий случай:

  • async_read() работает и async_wait() не готов к запуску, то async_read()обработчик завершения вызывается и отменяет async_wait(), вызывая async_wait()обработчик завершения для запуска с ошибкой boost::asio::error::operation_aborted,
  • async_read() не готов бежать и async_wait() бежит, тогда async_wait()обработчик завершения вызывается и отменяет async_read(), вызывая async_read()обработчик завершения для запуска с ошибкой boost::asio::error::operation_aborted,
  • async_read() а также async_wait() беги, тогда async_read()обработчик завершения вызывается первым, но async_wait() операция уже завершена и не может быть отменена, поэтому async_wait()обработчик завершения будет работать без ошибок.
  • async_read() а также async_wait() беги, тогда async_wait()обработчик завершения вызывается первым, но async_read() операция уже завершена и не может быть отменена, поэтому async_read()обработчик завершения будет работать без ошибок.

Обработчик завершения error_code указывает на состояние операции и не отражает изменения состояния, вызванные другими обработчиками завершения; поэтому, когда error_code успешно, может потребоваться проверить текущее состояние для выполнения условного ветвления. Однако, прежде чем вводить дополнительное состояние, может потребоваться попытаться изучить цель операции более высокого уровня и то, какое состояние уже доступно. Для этого примера давайте определим, что цель async_read_with_timeout() закрывать сокет, если данные не были получены до истечения срока. Для состояния сокет либо открыт, либо закрыт; таймер обеспечивает время истечения; и системные часы показывают текущее время. Изучив цель и имеющуюся информацию о состоянии, можно предложить следующее:

  • async_wait()Обработчик должен закрывать сокет, только если текущее время истечения таймера прошло.
  • async_read()Обработчик должен установить время истечения таймера в будущем.

При таком подходе, если async_read()обработчик завершения запускается до async_wait()то либо async_wait() будет отменен или async_wait()Обработчик завершения не будет закрывать соединение, так как текущее время истечения в будущем. С другой стороны, если async_wait()обработчик завершения запускается до async_read()то либо async_read() будет отменен или async_read()Обработчик завершения может обнаружить, что сокет закрыт.

Вот полный минимальный пример, демонстрирующий этот подход для различных вариантов использования:

#include <cassert>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class client
{
public:

  // This demo is only using status for asserting code paths.  It is not
  // necessary nor should it be used for conditional branching.
  enum status_type
  {
    unknown,
    timeout,
    read_success,
    read_failure
  };

public:

  client(boost::asio::ip::tcp::socket& socket)
    : strand_(socket.get_io_service()),
      timer_(socket.get_io_service()),
      socket_(socket),
      status_(unknown)
  {}

  status_type status() const { return status_; }

  void async_read_with_timeout(boost::posix_time::seconds seconds)
  {
    strand_.post(boost::bind(
        &client::do_async_read_with_timeout, this, seconds));
  }

private:

  void do_async_read_with_timeout(boost::posix_time::seconds seconds)
  {
    // Start a timeout for the read.
    timer_.expires_from_now(seconds);
    timer_.async_wait(strand_.wrap(boost::bind(
        &client::handle_wait, this,
        boost::asio::placeholders::error)));

    // Start the read operation.
    boost::asio::async_read(socket_,  
        boost::asio::buffer(buffer_),
        strand_.wrap(boost::bind(
          &client::handle_read, this,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred)));
  }

  void handle_wait(const boost::system::error_code& error)
  {
    // On error, such as cancellation, return early.
    if (error)
    {
      std::cout << "timeout cancelled" << std::endl;
      return;
    }

    // The timer may have expired, but it is possible that handle_read()
    // ran succesfully and updated the timer's expiration:
    // - a new timeout has been started.  For example, handle_read() ran and
    //   invoked do_async_read_with_timeout().
    // - there are no pending timeout reads.  For example, handle_read() ran
    //   but did not invoke do_async_read_with_timeout();
    if (timer_.expires_at() > boost::asio::deadline_timer::traits_type::now())
    {
      std::cout << "timeout occured, but handle_read ran first" << std::endl;
      return;
    }

    // Otherwise, a timeout has occured and handle_read() has not executed, so
    // close the socket, cancelling the read operation.
    std::cout << "timeout occured" << std::endl;
    status_ = client::timeout;
    boost::system::error_code ignored_ec;
    socket_.close(ignored_ec);
  }

  void handle_read(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
  {
    // Update timeout state to indicate handle_read() has ran.  This
    // cancels any pending timeouts.
    timer_.expires_at(boost::posix_time::pos_infin);

    // On error, return early.
    if (error)
    {
      std::cout << "read failed: " << error.message() << std::endl;
      // Only set status if it is unknown.
      if (client::unknown == status_) status_ = client::read_failure;
      return;
    }

    // The read was succesful, but if a timeout occured and handle_wait()
    // ran first, then the socket is closed, so return early.
    if (!socket_.is_open())
    {
      std::cout << "read was succesful but timeout occured" << std::endl;
      return;
    }

    std::cout << "read was succesful" << std::endl;
    status_ = client::read_success;
  }

private:

  boost::asio::io_service::strand strand_;
  boost::asio::deadline_timer timer_;
  boost::asio::ip::tcp::socket& socket_;
  char buffer_[1];
  status_type status_;
};

// This example is not interested in the connect handlers, so provide a noop
// function that will be passed to bind to meet the handler concept
// requirements.
void noop() {}

/// @brief Create a connection between the server and client socket.
void connect_sockets(
  boost::asio::ip::tcp::acceptor& acceptor,
  boost::asio::ip::tcp::socket& server_socket,
  boost::asio::ip::tcp::socket& client_socket)
{
  boost::asio::io_service& io_service = acceptor.get_io_service();
  acceptor.async_accept(server_socket, boost::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), boost::bind(&noop));
  io_service.reset();
  io_service.run();
  io_service.reset();
}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));

  // Scenario 1: timeout
  // The server writes no data, causing a client timeout to occur.
  {
    std::cout << "[Scenario 1: timeout]" << std::endl;
    // Create and connect I/O objects.
    tcp::socket server_socket(io_service);
    tcp::socket client_socket(io_service);
    connect_sockets(acceptor, server_socket, client_socket);

    // Start read with timeout on client.
    client client(client_socket);
    client.async_read_with_timeout(boost::posix_time::seconds(0));

    // Allow do_read_with_timeout to intiate actual operations.
    io_service.run_one();    

    // Run timeout and read operations.
    io_service.run();
    assert(client.status() == client::timeout);
  }

  // Scenario 2: no timeout, succesful read
  // The server writes data and the io_service is ran before the timer 
  // expires.  In this case, the async_read operation will complete and
  // cancel the async_wait.
  {
    std::cout << "[Scenario 2: no timeout, succesful read]" << std::endl;
    // Create and connect I/O objects.
    tcp::socket server_socket(io_service);
    tcp::socket client_socket(io_service);
    connect_sockets(acceptor, server_socket, client_socket);

    // Start read with timeout on client.
    client client(client_socket);
    client.async_read_with_timeout(boost::posix_time::seconds(10));

    // Allow do_read_with_timeout to intiate actual operations.
    io_service.run_one();

    // Write to client.
    boost::asio::write(server_socket, boost::asio::buffer("test"));

    // Run timeout and read operations.
    io_service.run();
    assert(client.status() == client::read_success);
  }

  // Scenario 3: no timeout, failed read
  // The server closes the connection before the timeout, causing the
  // async_read operation to fail and cancel the async_wait operation.
  {
    std::cout << "[Scenario 3: no timeout, failed read]" << std::endl;
    // Create and connect I/O objects.
    tcp::socket server_socket(io_service);
    tcp::socket client_socket(io_service);
    connect_sockets(acceptor, server_socket, client_socket);

    // Start read with timeout on client.
    client client(client_socket);
    client.async_read_with_timeout(boost::posix_time::seconds(10));

    // Allow do_read_with_timeout to intiate actual operations.
    io_service.run_one();

    // Close the socket.
    server_socket.close();

    // Run timeout and read operations.
    io_service.run();
    assert(client.status() == client::read_failure);
  }

  // Scenario 4: timeout and read success
  // The server writes data, but the io_service is not ran until the
  // timer has had time to expire.  In this case, both the await_wait and
  // asnyc_read operations complete, but the order in which the
  // handlers run is indeterminiate.
  {
    std::cout << "[Scenario 4: timeout and read success]" << std::endl;
    // Create and connect I/O objects.
    tcp::socket server_socket(io_service);
    tcp::socket client_socket(io_service);
    connect_sockets(acceptor, server_socket, client_socket);

    // Start read with timeout on client.
    client client(client_socket);
    client.async_read_with_timeout(boost::posix_time::seconds(0));

    // Allow do_read_with_timeout to intiate actual operations.
    io_service.run_one();

    // Allow the timeout to expire, the write to the client, causing both
    // operations to complete with success.
    boost::this_thread::sleep_for(boost::chrono::seconds(1));
    boost::asio::write(server_socket, boost::asio::buffer("test"));

    // Run timeout and read operations.
    io_service.run();
    assert(   (client.status() == client::timeout)
           || (client.status() == client::read_success));
  }
}

И его вывод:

[Scenario 1: timeout]
timeout occured
read failed: Operation canceled
[Scenario 2: no timeout, succesful read]
read was succesful
timeout cancelled
[Scenario 3: no timeout, failed read]
read failed: End of file
timeout cancelled
[Scenario 4: timeout and read success]
read was succesful
timeout occured, but handle_read ran first
Другие вопросы по тегам