Сохранить список переменных-переменных для вызовов fprintf
Я пишу тяжелую многопоточную [>170 потоков] программу на C++11. Каждый поток записывает информацию в один файл, используемый всеми потоками. Из соображений производительности я хочу создать поток журнала, который записывает информацию через fprintf()
в глобальный файл. Я понятия не имею, как организовать структуру, в которую рабочие потоки записывают информацию, которую затем можно прочитать потоком журнала.
Почему я не звоню sprintf()
в каждом рабочем потоке, а затем просто предоставить выходной буфер для потока журнала? Для форматированного вывода в файл журнала я использую locale
в fprintf()
функции, которые отличаются от остальной части потока. Поэтому мне придется постоянно переключать и блокировать / охранять xprintf()
звонки, чтобы отличить locale
выход. В ветке журнала у меня есть один locale
настройка используется для всего вывода, в то время как рабочие потоки имеют свои locale
версия.
Другая причина для потока журнала состоит в том, что я должен "сгруппировать" вывод, иначе информация из каждого рабочего потока не будет в блоке:
Неправильно:
Information A Thread #1
Information A Thread #2
Information B Thread #1
Information B Thread #2
Правильный:
Information A Thread #1
Information B Thread #1
Information A Thread #2
Information B Thread #2
Для достижения этой группировки я должен защищать вывод в каждом рабочем потоке, который замедляет время выполнения потока.
Как я могу сохранить va_list
в структуру таким образом, он может быть прочитан потоком журнала и передан обратно fprintf()
?
1 ответ
Я не понимаю, как это было бы легко сделать с помощью устаревшего C vprintf
с va_list
s. Поскольку вы хотите передавать вещи между потоками, рано или поздно вам придется каким-то образом использовать кучу.
Ниже представлено решение, которое использует Boost.Format для форматирования и Boost.Variant для передачи параметров. Пример завершен и работает, если вы объедините следующие блоки кода по порядку. Если вы компилируете с GCC, вам нужно передать -pthread
флаг компоновщика. И, конечно, вам также понадобятся две библиотеки Boost, которые предназначены только для заголовков. Вот заголовки, которые мы будем использовать.
#include <condition_variable>
#include <iostream>
#include <list>
#include <locale>
#include <mutex>
#include <random>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <boost/format.hpp>
#include <boost/variant.hpp>
Во-первых, нам нужен какой-то механизм для асинхронного выполнения некоторых задач, в этом случае выведите наши сообщения регистрации. Поскольку концепция является общей, я использую "абстрактный" базовый класс Spooler
за это. Его код основан на выступлении Херба Саттера "Программирование без блокировки (или жонглирование бритвенными лезвиями)" на CppCon 2014 ( часть 1, часть 2). Я не буду вдаваться в подробности об этом коде, потому что он в основном строит леса, не связанные напрямую с вашим вопросом, и я предполагаю, что у вас уже есть эта функциональность. мой Spooler
использует std::list
защищен std::mutex
как очередь задач. Возможно, стоит рассмотреть возможность использования структуры данных без блокировки.
class Spooler
{
private:
bool done_ {};
std::list<std::function<void(void)>> queue_ {};
std::mutex mutex_ {};
std::condition_variable condvar_ {};
std::thread worker_ {};
public:
Spooler() : worker_ {[this](){ work(); }}
{
}
~Spooler()
{
auto poison = [this](){ done_ = true; };
this->submit(std::move(poison));
if (this->worker_.joinable())
this->worker_.join();
}
protected:
void
submit(std::function<void(void)> task)
{
// This is basically a push_back but avoids potentially blocking
// calls while in the critical section.
decltype(this->queue_) tmp {std::move(task)};
{
std::unique_lock<std::mutex> lck {this->mutex_};
this->queue_.splice(this->queue_.cend(), tmp);
}
this->condvar_.notify_all();
}
private:
void
work()
{
do
{
std::unique_lock<std::mutex> lck {this->mutex_};
while (this->queue_.empty())
this->condvar_.wait(lck);
const auto task = std::move(this->queue_.front());
this->queue_.pop_front();
lck.unlock();
task();
}
while (!this->done_);
}
};
От Spooler
Теперь мы выведем Logger
что (в частном порядке) наследует свои асинхронные возможности от Spooler
и добавляет определенные функции ведения журнала. Он имеет только один член функции с именем log
которая принимает в качестве параметров строку формата и ноль или более аргументов для форматирования в нее как std::vector
из boost::variant
s.
К сожалению, это ограничивает нас фиксированным числом типов, которые мы можем поддерживать, но это не должно быть большой проблемой, так как C printf
также не поддерживает произвольные типы. Ради этого примера я использую только int
а также double
но вы можете расширить список с помощью std::string
s, void *
указатели или что там у тебя.
log
Функция создает лямбда-выражение, которое создает boost::format
объект, передает все аргументы, а затем записывает его std::log
или куда вы хотите отправить отформатированное сообщение.
Конструктор boost::format
имеет перегрузку, которая принимает строку формата и локаль. Возможно, вас заинтересует этот вариант, так как вы упомянули установку пользовательской локали в комментариях. Обычный конструктор принимает только один аргумент - строку формата.
Обратите внимание, как все форматирование и вывод выполняется в потоке спулера.
class Logger : Spooler
{
public:
void
log(const std::string& fmt,
const std::vector<boost::variant<int, double>>& args)
{
auto task = [fmt, args](){
boost::format msg {fmt, std::locale {"C"}}; // your locale here
for (const auto& arg : args)
msg % arg; // feed the next argument
std::clog << msg << std::endl; // print the formatted message
};
this->submit(std::move(task));
}
};
Это все, что нужно. Теперь мы можем использовать Logger
как в этом примере. Важно, чтобы все рабочие потоки были join()
ред до Logger
уничтожен или не будет обрабатывать все сообщения.
int
main()
{
Logger logger {};
std::vector<std::thread> threads {};
std::random_device rnddev {};
for (int i = 0; i < 4; ++i)
{
const auto seed = rnddev();
auto task = [&logger, i, seed](){
std::default_random_engine rndeng {seed};
std::uniform_real_distribution<double> rnddist {0.0, 0.5};
for (double p = 0.0; p < 1.0; p += rnddist(rndeng))
logger.log("thread #%d is %6.2f %% done", {i, 100.0 * p});
logger.log("thread #%d has completed its work", {i});
};
threads.emplace_back(std::move(task));
}
for (auto& thread : threads)
thread.join();
}
Возможный вывод:
thread #1 is 0.00 % done
thread #0 is 0.00 % done
thread #0 is 26.84 % done
thread #0 is 76.15 % done
thread #3 is 0.00 % done
thread #0 has completed its work
thread #3 is 34.70 % done
thread #3 is 78.92 % done
thread #3 is 91.89 % done
thread #3 has completed its work
thread #1 is 26.98 % done
thread #1 is 73.84 % done
thread #1 has completed its work
thread #2 is 0.00 % done
thread #2 is 10.17 % done
thread #2 is 29.85 % done
thread #2 is 79.03 % done
thread #2 has completed its work