Пул потоков с использованием boost asio

Я пытаюсь создать ограниченный класс пула потоков, используя boost::asio. Но я застрял в одной точке, может кто-нибудь мне помочь.

Единственная проблема - это место, где я должен уменьшить счетчик?

код не работает, как ожидалось.

проблема в том, что я не знаю, когда мой поток завершит выполнение и как я узнаю, что он вернулся в пул

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads \n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread \n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}

1 ответ

Решение

Короче говоря, вам нужно обернуть предоставленную пользователем задачу другой функцией, которая будет:

  • Вызвать пользовательскую функцию или вызываемый объект.
  • Заблокируйте мьютекс и уменьшите счетчик.

Возможно, я не понимаю все требования для этого пула потоков. Таким образом, для ясности, вот точный список требований, которые я считаю:

  • Пул управляет временем жизни потоков. Пользователь не должен иметь возможность удалять потоки, которые находятся в пуле.
  • Пользователь может назначить задачу для пула ненавязчивым способом.
  • Когда задача назначается, если все потоки в пуле в настоящее время выполняют другие задачи, то эта задача отбрасывается.

Прежде чем я представлю реализацию, я хотел бы подчеркнуть несколько ключевых моментов:

  • После запуска потока он будет работать до его завершения, отмены или завершения. Функция, которую выполняет поток, не может быть переназначена. Чтобы позволить одному потоку выполнять несколько функций в течение своей жизни, поток захочет запустить функцию, которая будет читать из очереди, например io_service::run() и вызываемые типы размещаются в очереди событий, например, из io_service::post(),
  • io_service::run() возвращается, если в io_service, io_service останавливается или выдается исключение из обработчика, который выполнялся потоком. Предотвращать io_serivce::run() от возвращения, когда нет незаконченной работы, io_service::work класс может быть использован.
  • Определение требований к типу задачи (т. Е. Тип задачи должен вызываться object() синтаксис) вместо того, чтобы требовать тип (то есть задача должна наследоваться от process), обеспечивает большую гибкость для пользователя. Это позволяет пользователю предоставлять задачу в виде указателя функции или типа, обеспечивающего нулевой operator(),

Реализация с использованием boost::asio:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

Несколько комментариев о реализации:

  • Обработка исключений должна происходить вокруг задачи пользователя. Если пользовательская функция или вызываемый объект выдает исключение, которое не имеет типа boost::thread_interrupted, затем std::terminate() называется. Это результат исключений Boost.Thread в поведении потоковых функций. Стоит также прочитать эффект Boost.Asio для исключений, генерируемых обработчиками.
  • Если пользователь предоставляет task с помощью boost::bind тогда вложенный boost::bind не удастся скомпилировать. Требуется один из следующих параметров:
    • Не поддерживается task создано boost::bind,
    • Мета-программирование для выполнения ветвления во время компиляции в зависимости от того, является ли тип пользователя результатом boost::bind чтобы boost::protect может быть использован, как boost::protect только функционирует должным образом на определенных функциональных объектах.
    • Используйте другой тип, чтобы передать task косвенно Я решил использовать boost::function для читабельности за счет потери точного типа. boost::tuple будучи немного менее читабельным, его также можно использовать для сохранения точного типа, как видно из примера сериализации Boost.Asio.

Код приложения теперь может использовать thread_pool тип ненавязчиво:

void work() {};

struct worker
{
  void operator()() {};
};

void more_work( int ) {};

int main()
{ 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

thread_pool может быть создан без Boost.Asio и может быть немного проще для сопровождающих, так как им больше не нужно знать о Boost.Asio поведение, например, когда io_service::run() вернуть, а что такое io_service::work объект:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Set running flag to false then notify all threads.
    {
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    }

    try
    {
      threads_.join_all();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}
  }

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  }

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  {
    while( running_ )
    {
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      {
        condition_.wait( lock );
      }
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      {
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        {
          task();
        }
        // Suppress all exceptions.
        catch ( const std::exception& ) {}
      }

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
    } // while running_
  }
};
Другие вопросы по тегам