Сохранить список переменных-переменных для вызовов fprintf

Я пишу тяжелую многопоточную [>170 потоков] программу на C++11. Каждый поток записывает информацию в один файл, используемый всеми потоками. Из соображений производительности я хочу создать поток журнала, который записывает информацию через fprintf() в глобальный файл. Я понятия не имею, как организовать структуру, в которую рабочие потоки записывают информацию, которую затем можно прочитать потоком журнала.

Почему я не звоню sprintf() в каждом рабочем потоке, а затем просто предоставить выходной буфер для потока журнала? Для форматированного вывода в файл журнала я использую locale в fprintf() функции, которые отличаются от остальной части потока. Поэтому мне придется постоянно переключать и блокировать / охранять xprintf() звонки, чтобы отличить locale выход. В ветке журнала у меня есть один locale настройка используется для всего вывода, в то время как рабочие потоки имеют свои locale версия.

Другая причина для потока журнала состоит в том, что я должен "сгруппировать" вывод, иначе информация из каждого рабочего потока не будет в блоке:

Неправильно:

Information A Thread #1
Information A Thread #2
Information B Thread #1
Information B Thread #2

Правильный:

Information A Thread #1
Information B Thread #1
Information A Thread #2
Information B Thread #2

Для достижения этой группировки я должен защищать вывод в каждом рабочем потоке, который замедляет время выполнения потока.

Как я могу сохранить va_list в структуру таким образом, он может быть прочитан потоком журнала и передан обратно fprintf()?

1 ответ

Решение

Я не понимаю, как это было бы легко сделать с помощью устаревшего C vprintf с va_list s. Поскольку вы хотите передавать вещи между потоками, рано или поздно вам придется каким-то образом использовать кучу.

Ниже представлено решение, которое использует Boost.Format для форматирования и Boost.Variant для передачи параметров. Пример завершен и работает, если вы объедините следующие блоки кода по порядку. Если вы компилируете с GCC, вам нужно передать -pthread флаг компоновщика. И, конечно, вам также понадобятся две библиотеки Boost, которые предназначены только для заголовков. Вот заголовки, которые мы будем использовать.

#include <condition_variable>
#include <iostream>
#include <list>
#include <locale>
#include <mutex>
#include <random>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include <boost/format.hpp>
#include <boost/variant.hpp>

Во-первых, нам нужен какой-то механизм для асинхронного выполнения некоторых задач, в этом случае выведите наши сообщения регистрации. Поскольку концепция является общей, я использую "абстрактный" базовый класс Spooler за это. Его код основан на выступлении Херба Саттера "Программирование без блокировки (или жонглирование бритвенными лезвиями)" на CppCon 2014 ( часть 1, часть 2). Я не буду вдаваться в подробности об этом коде, потому что он в основном строит леса, не связанные напрямую с вашим вопросом, и я предполагаю, что у вас уже есть эта функциональность. мой Spooler использует std::list защищен std::mutex как очередь задач. Возможно, стоит рассмотреть возможность использования структуры данных без блокировки.

class Spooler
{
private:

  bool done_ {};
  std::list<std::function<void(void)>> queue_ {};
  std::mutex mutex_ {};
  std::condition_variable condvar_ {};
  std::thread worker_ {};

public:

  Spooler() : worker_ {[this](){ work(); }}
  {
  }

  ~Spooler()
  {
    auto poison = [this](){ done_ = true; };
    this->submit(std::move(poison));
    if (this->worker_.joinable())
      this->worker_.join();
  }

protected:

  void
  submit(std::function<void(void)> task)
  {
    // This is basically a push_back but avoids potentially blocking
    // calls while in the critical section.
    decltype(this->queue_) tmp {std::move(task)};
    {
      std::unique_lock<std::mutex> lck {this->mutex_};
      this->queue_.splice(this->queue_.cend(), tmp);
    }
    this->condvar_.notify_all();
  }

private:

  void
  work()
  {
    do
      {
        std::unique_lock<std::mutex> lck {this->mutex_};
        while (this->queue_.empty())
          this->condvar_.wait(lck);
        const auto task = std::move(this->queue_.front());
        this->queue_.pop_front();
        lck.unlock();
        task();
      }
    while (!this->done_);
  }
};

От Spooler Теперь мы выведем Logger что (в частном порядке) наследует свои асинхронные возможности от Spooler и добавляет определенные функции ведения журнала. Он имеет только один член функции с именем log которая принимает в качестве параметров строку формата и ноль или более аргументов для форматирования в нее как std::vector из boost::variant s.

К сожалению, это ограничивает нас фиксированным числом типов, которые мы можем поддерживать, но это не должно быть большой проблемой, так как C printf также не поддерживает произвольные типы. Ради этого примера я использую только int а также double но вы можете расширить список с помощью std::string s, void * указатели или что там у тебя.

log Функция создает лямбда-выражение, которое создает boost::format объект, передает все аргументы, а затем записывает его std::log или куда вы хотите отправить отформатированное сообщение.

Конструктор boost::format имеет перегрузку, которая принимает строку формата и локаль. Возможно, вас заинтересует этот вариант, так как вы упомянули установку пользовательской локали в комментариях. Обычный конструктор принимает только один аргумент - строку формата.

Обратите внимание, как все форматирование и вывод выполняется в потоке спулера.

class Logger : Spooler
{
 public:

  void
  log(const std::string& fmt,
      const std::vector<boost::variant<int, double>>& args)
  {
    auto task = [fmt, args](){
      boost::format msg {fmt, std::locale {"C"}};  // your locale here
      for (const auto& arg : args)
        msg % arg;  // feed the next argument
      std::clog << msg << std::endl;  // print the formatted message
    };
    this->submit(std::move(task));
  }
};

Это все, что нужно. Теперь мы можем использовать Logger как в этом примере. Важно, чтобы все рабочие потоки были join() ред до Logger уничтожен или не будет обрабатывать все сообщения.

int
main()
{
  Logger logger {};
  std::vector<std::thread> threads {};
  std::random_device rnddev {};
  for (int i = 0; i < 4; ++i)
    {
      const auto seed = rnddev();
      auto task = [&logger, i, seed](){
        std::default_random_engine rndeng {seed};
        std::uniform_real_distribution<double> rnddist {0.0, 0.5};
        for (double p = 0.0; p < 1.0; p += rnddist(rndeng))
          logger.log("thread #%d is %6.2f %% done", {i, 100.0 * p});
        logger.log("thread #%d has completed its work", {i});
      };
      threads.emplace_back(std::move(task));
    }
  for (auto& thread : threads)
    thread.join();
}

Возможный вывод:

thread #1 is   0.00 % done
thread #0 is   0.00 % done
thread #0 is  26.84 % done
thread #0 is  76.15 % done
thread #3 is   0.00 % done
thread #0 has completed its work
thread #3 is  34.70 % done
thread #3 is  78.92 % done
thread #3 is  91.89 % done
thread #3 has completed its work
thread #1 is  26.98 % done
thread #1 is  73.84 % done
thread #1 has completed its work
thread #2 is   0.00 % done
thread #2 is  10.17 % done
thread #2 is  29.85 % done
thread #2 is  79.03 % done
thread #2 has completed its work
Другие вопросы по тегам