Пул потоков C++11 - задачи с входными параметрами

Я пытаюсь использовать простой пример пула потоков из книги Энтони Уильямса "Параллельность C++ в действии". Я даже нашел код здесь (класс thread_pool) в одном из постов: Синхронизация задач, но у меня другой вопрос. Я хотел бы отправить задачу (функцию-член) в очередь со следующей подписью:

class A;
class B;
bool MyClass::Func(A*, B*); 

Как мне нужно изменить класс thread_pool или как мне упаковать мою функцию в какой-нибудь void F (), который предполагается использовать в качестве задачи в этом примере? Вот самая важная часть класса для меня (подробности см. По ссылке выше):

class thread_pool 
{
  thread_safe_queue<std::function<void()> work_queue; // bool MyClass::Func(a,b) ??

  void worker_thread() {
   while(!done) {         
    std::function<void()> task;
    if(work_queue.try_pop(task)) {
     task();  // how should my function MyClass::Func(a,b) be called here?                    
    }
    else {
     std::this_thread::yield();
    }
   }
  }

  // -- Submit a task to the thread pool
  template <typename FunctionType>
  void submit(FunctionType f) {
  work_queue.push(std::function<void()>(f)); // how should bool MyClassFunc(A*, B*) be submitted here
 }

}

И, наконец, как я могу вызвать функцию отправки в моем коде?

Большое спасибо за вашу помощь (к сожалению, я пока не очень опытен в использовании всех возможностей C++11, поэтому, возможно, и здесь мне нужна помощь, но ответ на этот вопрос можно начать с:)),

2 ответа

Вы должны привязать параметры к значению при вставке задачи в очередь. Это означает, что вам нужно создать оболочку для вашей функции, которая хранит значения для this и значения для двух параметров функции. Есть много способов сделать это, например лямбда-функции или std::bind,

work_queue.push_back( [obj, a, b]() {obj->Func(a,b)} );
work_queue.push_back( std::bind(&MyClass::Func, obj, a, b) );

Ваша функция отправки должна принять эти параметры и создать привязку, например

template<typename F, typename... Args>
void submit(F f, Args&&... args) {
    work_queue.push_back( std::bind(f, std::forward<Args>(args)...) );
}

Для функции-члена может быть удобно создать специальную перегрузку для функций-членов и объектов.

Я написал что-то, что делает что-то (очень) похожее на это раньше. Я выложу код здесь, и вы можете посмотреть. GenCmd - это оболочка функций. Очередь выглядит следующим образом и используется / определяется в Impl (код опущен). Вам нужно только взглянуть на реализацию GenCmd, поскольку она содержит необходимую работу.

ConcurrentQueue<std::unique_ptr<Cmd>> cqueue_;

Я обернул std::function<>, чтобы быть полиморфным в очереди. std_utility содержит make_index_sequence, которая используется для извлечения значений из кортежа (google make_index_sequence, чтобы найти реализацию где-нибудь, если она еще не является частью вашей библиотеки std).

#include <functional>
#include <memory>
#include <iostream>
#include <utility>
#include <boost/noncopyable.hpp>

class CmdExecutor : public boost::noncopyable
{
  public:
    CmdExecutor(std::ostream& errorOutputStream);
    ~CmdExecutor();

    template <class Receiver, class ... FArgs, class ... CArgs >
      void process(Receiver& receiver, void (Receiver::*f)(FArgs...), CArgs&&... args)
    {
      process(std::unique_ptr<Cmd>(new GenCmd<void(Receiver,FArgs...)>(f, receiver, std::forward<CArgs>(args)...)));
    }

  private:
    class Cmd
    {
      public:
        virtual void execute() = 0;
        virtual ~Cmd(){}
    };

    template <class T> class GenCmd;

    template <class Receiver, class ... Args>
    class GenCmd<void(Receiver, Args...)> : public Cmd
    {
      public:
        template <class FuncT, class ... CArgs>
        GenCmd(FuncT&& f, Receiver& receiver, CArgs&&... args)
          : call_(std::move(f)),
            receiver_(receiver),
            args_(args...)
        {
        }
        //We must convert references to values...

        virtual void execute()
        {
          executeImpl(std::make_index_sequence<sizeof...(Args)>{});
        }

      private:
        template <std::size_t ... Is>
        void executeImpl(std::index_sequence<Is...>)
        {
          // We cast the values in the tuple to the original type (of Args...)
          call_(receiver_, static_cast<Args>(std::get<Is>(args_))...);
        }

        std::function<void(Receiver&, Args...)> call_;
        Receiver& receiver_;
        // NOTE:
        // References converted to values for safety sake, as they are likely
        // to not be around when this is executed in other context.
        std::tuple<typename std::remove_reference<Args>::type...> args_;
    };

    void process(std::unique_ptr<Cmd> command);

    class Impl;
    Impl* pimpl_;
};

Это в основном используется следующим образом:

...
CmdExecutor context_;
...

void MyClass::myFunction()
{
  ArgX x;
  ArgY y;

  context_.process(*this, &MyClass::someFunction, x, y);
}

Из этого видно, что процесс выполняет перенос типа функции-члена и преобразует его в базовый тип для хранения в очереди. Это позволяет использовать несколько типов аргументов. Я выбрал использование полиморфизма времени выполнения для хранения типов функций, отсюда и производная от GenCmd.

Примечание. Если вызванная функция получает значение rvalue (Arg&&), сохраненный тип преобразуется в исходный тип, что вызывает перемещение и делает соответствующий аргумент команды (который будет вызван только один раз) пустым (это, по крайней мере, намерение) - не проверено...)

Другие вопросы по тегам