Boost:: обработать вывод пустых строк

Я разрабатываю приложение, в котором мне нужно запускать и останавливать различные исполняемые файлы в зависимости от ввода пользователя. Мне бы хотелось, чтобы моя "основная" программа работала как обычно, пока выполняются эти исполняемые файлы, то есть не ожидала их завершения, которое теоретически могло бы быть заражено. Кроме того, мне нужно иметь возможность получать std_out и отправлять std_in этим исполняемым файлам.

На данный момент у меня есть настройка, где у меня есть класс диспетчера процессов:

class ProcessManager {
private:
    std::vector<patchProcess> processList;
    boost::process::group processGroup;
public:
    ProcessManager();
    void addNew(std::string name,std::string command, std::string args);
    void killAll();
    void printAllIn();
};

Где процесс исправления:

struct patchProcess {
    std::string name;
    boost::process::child *process;
    std::shared_ptr<boost::process::ipstream> procOutStream;
};

Где я могу запустить / добавить новый процесс с функцией

void bbefxProcessManager::addNew(std::string name, std::string command, std::string args) {
    LOG(info) << "Creating process for patch " << name;
    patchProcess pp;
    pp.name = name;
    pp.procOutStream = std::shared_ptr<boost::process::ipstream>(new boost::process::ipstream);
    boost::process::child newProc(command,args,processGroup,boost::process::std_out > *pp.procOutStream);
    pp.process = &newProc;
    processList.push_back(pp);
}

И мои попытки печати:

void bbefxProcessManager::printAllIn() {
    std::string line;
        for (auto &proc : processList) {
            std::getline(*proc.procOutStream, line);
            std::cout << line << std::endl;
        }
}

Этот код успешно запускает процесс, однако readAllIn выдает пустой вывод. У меня такое чувство, что я делаю что-то ужасно не так std::shared_ptr<boost::process::ipstream> procOutStream;, Мое обоснование этого заключается в том, что я использую push_back в мой список процессов (вектор структуры), поэтому он должен быть копируемым. Я могу получить результат теста exec без использования структуры patchProcess и этих общих указателей, но это делает управление трудным / грязным. Я также могу подтвердить, что если я попытаюсь прочитать вывод в функции addNew с чем-то вроде:

while(true) {
        *pp.procOutStream >> line;
        std::cout << line << std::endl;

    }

Я получаю вывод моего исполняемого файла. Так значит ли это, что что-то не так с конструкторами копирования?

2 ответа

Решение

Перед вашими правками я начал работать над действительно асинхронным подходом:

Давайте уберем формальности:

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>

namespace ba = boost::asio;
namespace bp = boost::process;

#include <iostream>
#define LOG(x) std::clog

Теперь давайте создадим ProcessManager который запускает все процессы на одном io_service это отключение в деструкторе.

Служба ввода-вывода используется для планирования всей работы (например, асинхронный ввод-вывод). Я

  • случайно решил сосредоточиться на линейно-ориентированных операциях ввода-вывода
  • решил, что, вероятно, нет причин использовать более 1 потока ввода-вывода, но на случай, если strand это правильно синхронизирует операции по отношению к ребенку.
#include <map>
#include <list>
#include <thread>
class ProcessManager { // ugh naming smell
    using error_code = boost::system::error_code;
  private:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _keep{_service};
    boost::process::group _group;
    std::thread io_thread;

    struct patchProcess : std::enable_shared_from_this<patchProcess> {
        using ptr = std::shared_ptr<patchProcess>;
        static ptr start(std::string command, std::vector<std::string> args, ProcessManager& mgr) {
            ptr p(new patchProcess(std::move(command), std::move(args), mgr));
            p->output_read_loop();
            return p;
        }

        boost::optional<std::string> getline() {
            std::lock_guard<std::mutex> lk(_mx);
            std::string s;
            if (has_newline(_output.data()) && std::getline(std::istream(&_output), s))
                return s;
            return boost::none;
        }

        void write(std::string message) {
            std::lock_guard<std::mutex> lk(_mx);
            _input_bufs.push_back({false, std::move(message)});

            if (_input_bufs.size() == 1)
                input_write_loop();
        }

        void close_stdin() {
            std::lock_guard<std::mutex> lk(_mx);
            if (_input_bufs.empty()) {
                _strand.post([this, self=shared_from_this()] { _stdin.close(); });
            } else {
                _input_bufs.push_back({true, {}});
            }
        }

        bool is_running() { return _process.running(); }

      private:
        patchProcess(std::string command, std::vector<std::string> args, ProcessManager& mgr)
            : _strand(mgr._service),
              _stdout(mgr._service), _stdin(mgr._service),
              _process(command, args, mgr._group, bp::std_out > _stdout, bp::std_in < _stdin, mgr._service)
        { }

        void output_read_loop() {
            ba::async_read_until(_stdout, pending_output, "\n", _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
                if (!ec) {
                    std::lock_guard<std::mutex> lk(_mx);
                    std::ostream(&_output) << &pending_output;
                    output_read_loop();
                }
            }));
        }

        void input_write_loop() { // assumes _mx locked
            if (!_input_bufs.empty()) {
                auto& msg = _input_bufs.front();
                if (msg.eof) {
                    _strand.post([this, self=shared_from_this()] { _stdin.close(); });
                } else {
                    ba::async_write(_stdin, ba::buffer(_input_bufs.front().pay_load), 
                        _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
                            std::lock_guard<std::mutex> lk(_mx);
                            _input_bufs.pop_front();
                            if (!ec)
                                input_write_loop();
                        }));
                }
            }
        }

        ba::io_service::strand _strand; // thread-safe

        // strand-local
        bp::async_pipe _stdout, _stdin;
        bp::child _process;
        ba::streambuf pending_output;

        // mutex protected
        std::mutex mutable _mx;
        struct out_message { bool eof; std::string pay_load; };
        std::list<out_message> _input_bufs; // iterator stability again!
        ba::streambuf _output;

        // static helpers
        template <typename T>
        static bool has_newline(T buffer) {
            return std::find(buffers_begin(buffer), buffers_end(buffer), '\n') != buffers_end(buffer);
        }
    };

    using Map = std::map<std::string, patchProcess::ptr>; // iterator stability required!
    Map processList;

    void eventloop() {
        for(;;) try {
            if (!_service.run()) break;
        } catch(std::exception const& e) {
            LOG(error) << "Exception in handler: " << e.what() << "\n";
        }
    }
  public:
    ProcessManager() : io_thread([this] { eventloop(); }) { }

    ~ProcessManager() {
        status(__FUNCTION__);
        _keep.reset();
        io_thread.join();
        status(__FUNCTION__);
    }

    void status(std::string const& caption = "Status") const {
        for (auto& p : processList) {
            LOG(info) << caption << ": '" << p.first << "' is " << (p.second->is_running()? "still running":"done") << "\n";
        }
    }

    patchProcess::ptr addNew(std::string name, std::string command, std::vector<std::string> args) {
        auto pit = processList.find(name);
        if (pit != processList.end()) {
            if (pit->second->is_running()) {
                LOG(error) << "Process already running ('" << name << "')\n";
                return {};
            }
            // TODO make sure process cleaned up etc.
        }
        LOG(info) << "Creating process for patch " << name << "\n";
        return processList[name] = patchProcess::start(std::move(command), std::move(args), *this);
    }
};

демос

Самый наивный пробег будет:

int main() {
    ProcessManager pm;
}

Который, как и ожидалось, возвращается после бездействия. Далее мы попробуем

int main() {
    ProcessManager pm;
    pm.addNew("sleeper", "/bin/bash", {"-c", "sleep 3" });
}

Который предсказуемо ждет 3 секунды, прежде чем выйти. Это печатает:

Creating process for patch sleeper
~ProcessManager: 'sleeper' is still running
~ProcessManager: 'sleeper' is done

Но ПОДОЖДИТЕ! Разве вы не сказали, что не хотите ждать? Ну нет ни одного! В то же время вы можете делать все, что угодно. Это просто ProcessManager деструктор по умолчанию будет ждать, пока ребенок закончит.

Давайте сделаем некоторые IO:

Жить на Колиру

int main() {
    ProcessManager pm;

    auto ls  = pm.addNew("listing", "/bin/ls", {"-ltr" });

    boost::optional<std::string> l;

    while ((l = ls->getline()) || ls->is_running()) {
        if (l.is_initialized()) {
            std::cout << "ls: " << std::quoted(*l) << std::endl;
            l.reset();
        }
    }
}

Печать

Creating process for patch listing
ls: "total 172"
ls: "-rw-rw-rw- 1 2001 2000   5645 Feb 11 00:10 main.cpp"
ls: "-rwxr-xr-x 1 2001 2000 162784 Feb 11 00:10 a.out"
~ProcessManager: 'listing' is done
~ProcessManager: 'listing' is done

Чтобы понять, что процессы и их операции ввода-вывода являются синхронными, мы можем заменить

auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });

с чем-то более измененным во времени:

auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });

Теперь, чтобы сделать его действительно сложным, давайте добавим еще один дочерний процесс и отправим вывод ls к другому child:

Жить на Колиру

int main() {
    ProcessManager pm;

    auto ls  = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });
    auto xxd = pm.addNew("hex encoding", "/usr/bin/xxd", {});

    boost::optional<std::string> l, x;

    bool closed = false;
    while ((l || (l = ls->getline())) || (x || (x = xxd->getline())) || ls->is_running() || xxd->is_running()) {
        if (l.is_initialized()) {
            xxd->write(std::move(*l) + '\n');
            l.reset();
            std::cout << "[forwarded from ls to xxd]" << std::endl;
        } else {
            if (!closed && !ls->is_running()) {
                std::cout << "[closing input to xxd]" << std::endl;
                xxd->close_stdin();
                closed = true;
            }
        }

        if (x.is_initialized()) {
            std::cout << std::quoted(*x) << std::endl;
            x.reset();
        }
    }
}

Теперь, в Coliru список слишком мал, чтобы быть интересным, но в моей системе вы получите такой вывод:

Creating process for patch listing
Creating process for patch hex encoding
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
"00000000: 746f 7461 6c20 3733 3635 0a2d 7277 2d72  total 7365.-rw-r"
"00000010: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000020: 6865 2020 2020 3133 3737 206d 6569 2031  he    1377 mei 1"
"00000030: 3020 2032 3031 3720 636d 616b 655f 696e  0  2017 cmake_in"
"00000040: 7374 616c 6c2e 636d 616b 650a 6c72 7778  stall.cmake.lrwx"
"00000050: 7277 7872 7778 2020 3120 7365 6865 2073  rwxrwx  1 sehe s"
"00000060: 6568 6520 2020 2020 2020 3820 6d65 6920  ehe       8 mei "
"00000070: 3234 2020 3230 3137 206d 6169 6e2e 6370  24  2017 main.cp"
"00000080: 7020 2d3e 2074 6573 742e 6370 700a 2d72  p -> test.cpp.-r"
"00000090: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"000000a0: 2073 6568 6520 2020 2020 3531 3420 7365   sehe     514 se"
"000000b0: 7020 3133 2030 383a 3336 2063 6f6d 7069  p 13 08:36 compi"
"000000c0: 6c65 5f63 6f6d 6d61 6e64 732e 6a73 6f6e  le_commands.json"
"000000d0: 0a2d 7277 2d72 772d 722d 2d20 2031 2073  .-rw-rw-r--  1 s"
"000000e0: 6568 6520 7365 6865 2020 2020 3135 3834  ehe sehe    1584"
"000000f0: 2073 6570 2032 3020 3232 3a30 3320 576f   sep 20 22:03 Wo"
"00000100: 7264 436f 756e 7465 722e 680a 2d72 772d  rdCounter.h.-rw-"
"00000110: 7277 2d72 2d2d 2020 3120 7365 6865 2073  rw-r--  1 sehe s"
"00000120: 6568 6520 2020 2020 3336 3920 7365 7020  ehe     369 sep "
"00000130: 3233 2030 333a 3131 2063 6f6d 6d6f 6e2e  23 03:11 common."
"00000140: 680a 2d72 772d 7277 2d72 2d2d 2020 3120  h.-rw-rw-r--  1 "
"00000150: 7365 6865 2073 6568 6520 2020 2020 3533  sehe sehe     53"
"00000160: 3920 7365 7020 3233 2030 333a 3131 2073  9 sep 23 03:11 s"
"00000170: 7472 7563 7473 616d 706c 652e 6870 700a  tructsample.hpp."
"00000180: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"00000190: 6865 2073 6568 6520 2020 2032 3335 3220  he sehe    2352 "
"000001a0: 7365 7020 3238 2032 333a 3230 2061 6461  sep 28 23:20 ada"
"000001b0: 7074 6976 655f 7061 7273 6572 2e68 0a2d  ptive_parser.h.-"
"000001c0: 7277 2d72 772d 722d 2d20 2031 2073 6568  rw-rw-r--  1 seh"
"000001d0: 6520 7365 6865 2020 2020 3538 3738 2073  e sehe    5878 s"
"000001e0: 6570 2032 3820 3233 3a32 3120 6164 6170  ep 28 23:21 adap"
"000001f0: 7469 7665 5f70 6172 7365 722e 6370 700a  tive_parser.cpp."
"00000200: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"00000210: 6865 2073 6568 6520 2020 2034 3232 3720  he sehe    4227 "
"00000220: 6f6b 7420 2034 2032 333a 3137 2070 686f  okt  4 23:17 pho"
"00000230: 656e 695f 7833 2e68 7070 0a2d 7277 2d72  eni_x3.hpp.-rw-r"
"00000240: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000250: 6865 2020 2031 3432 3035 2064 6563 2020  he   14205 dec  "
"00000260: 3620 3231 3a30 3820 434d 616b 6543 6163  6 21:08 CMakeCac"
"00000270: 6865 2e74 7874 0a2d 7277 2d72 772d 722d  he.txt.-rw-rw-r-"
"00000280: 2d20 2031 2073 6568 6520 7365 6865 2020  -  1 sehe sehe  "
"00000290: 2020 3630 3738 2064 6563 2031 3420 3032    6078 dec 14 02"
"000002a0: 3a35 3320 636f 6e6e 6563 7469 6f6e 2e68  :53 connection.h"
"000002b0: 7070 0a2d 7277 7872 7778 722d 7820 2031  pp.-rwxrwxr-x  1"
"000002c0: 2073 6568 6520 7365 6865 2020 2020 3136   sehe sehe    16"
"000002d0: 3736 206a 616e 2031 3220 3032 3a34 3420  76 jan 12 02:44 "
"000002e0: 636f 6d70 696c 655f 6266 2e70 790a 2d72  compile_bf.py.-r"
"000002f0: 772d 722d 2d72 2d2d 2020 3120 7365 6865  w-r--r--  1 sehe"
"00000300: 2073 6568 6520 2020 2038 3738 3020 6a61   sehe    8780 ja"
"00000310: 6e20 3132 2031 373a 3131 2074 6573 742e  n 12 17:11 test."
"00000320: 6269 6e0a 2d72 7778 7277 7872 2d78 2020  bin.-rwxrwxr-x  "
"00000330: 3120 7365 6865 2073 6568 6520 2020 2020  1 sehe sehe     "
"00000340: 3131 3920 6a61 6e20 3235 2031 333a 3537  119 jan 25 13:57"
"00000350: 2074 6573 742e 7079 0a2d 7277 7872 7778   test.py.-rwxrwx"
"00000360: 722d 7820 2031 2073 6568 6520 7365 6865  r-x  1 sehe sehe"
"00000370: 2020 2020 2020 3736 2066 6562 2020 3820        76 feb  8 "
"00000380: 3130 3a33 3920 7465 7374 2e73 680a 2d72  10:39 test.sh.-r"
"00000390: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"000003a0: 2073 6568 6520 2020 3236 3536 3920 6665   sehe   26569 fe"
"000003b0: 6220 2039 2031 313a 3533 2064 7261 6674  b  9 11:53 draft"
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[closing input to xxd]
"000003c0: 2e6d 640a 2d72 772d 7277 2d72 2d2d 2020  .md.-rw-rw-r--  "
"000003d0: 3120 7365 6865 2073 6568 6520 2020 2020  1 sehe sehe     "
"000003e0: 3131 3620 6665 6220 2039 2031 313a 3534  116 feb  9 11:54"
"000003f0: 2069 6e70 7574 2e74 7874 0a2d 7277 2d72   input.txt.-rw-r"
"00000400: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000410: 6865 2020 2020 2020 3739 2066 6562 2031  he      79 feb 1"
"00000420: 3020 3136 3a32 3420 6172 7869 760a 2d72  0 16:24 arxiv.-r"
"00000430: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"00000440: 2073 6568 6520 2020 2032 3933 3520 6665   sehe    2935 fe"
"00000450: 6220 3130 2031 363a 3238 2043 4d61 6b65  b 10 16:28 CMake"
"00000460: 4c69 7374 732e 7478 740a 2d72 772d 7277  Lists.txt.-rw-rw"
"00000470: 2d72 2d2d 2020 3120 7365 6865 2073 6568  -r--  1 sehe seh"
"00000480: 6520 2020 2035 3134 3520 6665 6220 3130  e    5145 feb 10"
"00000490: 2031 363a 3238 204d 616b 6566 696c 650a   16:28 Makefile."
"000004a0: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"000004b0: 6865 2073 6568 6520 2020 2033 3937 3620  he sehe    3976 "
"000004c0: 6665 6220 3130 2031 363a 3430 2074 6573  feb 10 16:40 tes"
"000004d0: 7431 2e63 7070 0a2d 7277 2d72 772d 722d  t1.cpp.-rw-rw-r-"
"000004e0: 2d20 2031 2073 6568 6520 7365 6865 2020  -  1 sehe sehe  "
"000004f0: 2020 3632 3434 2066 6562 2031 3120 3031    6244 feb 11 01"
"00000500: 3a31 3320 7465 7374 2e63 7070 0a2d 7277  :13 test.cpp.-rw"
"00000510: 7872 7778 722d 7820 2031 2073 6568 6520  xrwxr-x  1 sehe "
"00000520: 7365 6865 2037 3139 3336 3838 2066 6562  sehe 7193688 feb"
"00000530: 2031 3120 3031 3a31 3320 736f 7465 7374   11 01:13 sotest"
"00000540: 0a2d 7277 2d72 772d 722d 2d20 2031 2073  .-rw-rw-r--  1 s"
"00000550: 6568 6520 7365 6865 2020 2020 3535 3132  ehe sehe    5512"
"00000560: 2066 6562 2031 3120 3031 3a31 3620 5365   feb 11 01:16 Se"
"00000570: 7373 696f 6e2e 7669 6d0a 6472 7778 7277  ssion.vim.drwxrw"
"00000580: 7872 2d78 2031 3120 7365 6865 2073 6568  xr-x 11 sehe seh"
"00000590: 6520 2020 2020 2032 3320 6665 6220 3131  e      23 feb 11"
"000005a0: 2030 313a 3137 2043 4d61 6b65 4669 6c65   01:17 CMakeFile"
"000005b0: 730a 2d72 772d 7277 2d72 2d2d 2020 3120  s.-rw-rw-r--  1 "
"000005c0: 7365 6865 2073 6568 6520 2020 2020 2037  sehe sehe      7"
"000005d0: 3520 6665 6220 3131 2030 313a 3137 206f  5 feb 11 01:17 o"
"000005e0: 7574 7075 742e 7478 740a                 utput.txt."
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done

Я обнаружил, что я с видом на boost::process::child *process; член структуры, при использовании этого с общим указателем, как с ipstreamЯ получаю ожидаемые результаты.

Я собираюсь оставить этот вопрос открытым, хотя, поскольку я все еще не совсем счастлив, я делаю это правильно и был бы признателен за некоторый вклад.

Другие вопросы по тегам