进程为 运行 时是否可以使用进程输出?

Is is possible to use process output while process is running?

Boost.process 允许使用 Boost.asio 来执行异步读取。

据我了解,这对于在进程 运行ning 期间读取输出很有用,而无需等待进程终止。

但是要访问此输出,是否需要等待进程终止,或者是否可以在进程 运行ning 期间访问它,如何访问?

实际上我的需要是访问进程输出的开头(检查它是否按预期启动),同时保持它 运行ning.

为了详细说明上下文,我运行一个我想保留到执行结束的过程:

boost::asio::io_service ios;
std::vector<char> buf;

bp::child c("process_that_needs_to_keep_running", args, 
bp::std_out > boost::asio::buffer(buf), ios);

ios.run();
// I DON'T WANT WAIT FOR c TO TERMINATE
// but I want to check that buf contains some text that ensures me it started correctly
// the issue I have here is that I don't know how to read from buf, since its size and content might not be consistent
// is it possible to take a snapshot for instance?
check_started_correctly(buf);

这里的问题是生产者创建了我无法控制的输出,我只是发布输出。

如果您使用 bp::std_out > some_kind_of_buffer_or_future,您通常只会在退出时得到结果。

但是,您可以使用 async_pipe:

bp::async_pipe pipe(io);

bp::child c( //
    "/bin/bash",
    std::vector<std::string>{
        "-c",
        "for a in {1..20}; do sleep 1; echo message $a; done",
    },                    //
    bp::std_out > pipe,   //
    bp::on_exit(on_exit), //
    io);

现在,您必须显式在该管道上执行 IO:

boost::asio::streambuf sb;
async_read_until(                //
    pipe, sb, "message 5\n",     //
    [&](error_code ec, size_t) { //
        std::cout << "Got message 5 (" << ec.message() << ")" << std::endl;
    });

这个有效:

Live On Coliru

#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <iostream>

namespace bp = boost::process;
using boost::system::error_code;

namespace /*file-static*/ {
    using namespace std::chrono_literals;
    static auto       now = std::chrono::steady_clock::now;
    static const auto t0  = now();

    static auto timestamp() {
        return std::to_string((now() - t0) / 1.s) + "s ";
    }
} // namespace

int main() {
    boost::asio::io_context io;
    bp::async_pipe pipe(io);

    auto on_exit = [](int code, std::error_code ec) {
        std::cout << timestamp() << "on_exit: " << ec.message() << " code "
                  << code << std::endl;
    };

    bp::child c( //
        "/bin/bash",
        std::vector<std::string>{
            "-c",
            "for a in {1..20}; do sleep 1; echo message $a; done",
        },                    //
        bp::std_out > pipe,   //
        bp::on_exit(on_exit), //
        io);

    boost::asio::streambuf sb;
    async_read_until(                //
        pipe, sb, "message 5\n",     //
        [&](error_code ec, size_t) { //
            std::cout << timestamp() << "Got message 5 (" << ec.message() << ")"
                      << std::endl;
        });

    io.run();
}

版画

5.025400s Got message 5 (Success)
20.100547s on_exit: Success code 0

这样您就可以在需要时回复您正在寻找的内容。请记住 OS 并且 shell 在管道上进行流缓冲,但默认值为 line-buffering 因此,您可以期望在打印换行符后立即接收输入。

大缓冲区?

以上有点假设您可以缓冲整个输出直到有趣的消息。如果那是千兆字节呢?只要您的模式不是千兆字节,您就可以继续阅读,直到符合条件为止。

让我们将我们的示例变成一个异步 grep,它在所有提升 headers 中查找正则表达式 class\s*\w+_heap。当然,这是很多兆字节的数据,但我们只使用了 10Kb 的缓冲区:

std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte

size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");

现在我们创建一个读取循环,当缓冲区已满时读取直到匹配

std::function<void()> wait_for_message;
wait_for_message = [&] {
    async_read_until(                         //
        pipe, buf, re,                        //
        [&](error_code ec, size_t received) { //
            std::cerr << '\x0d' << timestamp() << "Checking for message ("
                      << ec.message() << ", total " << total_received
                      << ")                ";

            if (received || ec != boost::asio::error::not_found) {
                total_received += received;
                buf.consume(received);

                boost::smatch m;
                if (regex_search(text, m, re)) {
                    std::cout << "\n" << timestamp()
                              << "Found: " << std::quoted(m.str()) << " at "
                              << (total_received - m.size()) << " bytes"
                              << std::endl;
                }
            } else {
                // discard 90% of buffer capacity
                auto discard =
                    std::min(buf.max_size() / 10 * 9, buf.size());
                total_received += discard;
                buf.consume(discard);
            }

            if (!ec | (ec == boost::asio::error::not_found))
                wait_for_message();
            else
                std::cout << "\n" << timestamp() << ec.message() << std::endl;
        });
};

当然,这个系统可能会错过匹配 if 匹配超过缓冲区大小的 10%(因为我们只保留之前缓冲区内容的 10% 以允许匹配重叠读取边界)。

再看一遍Live On Coliru

#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <boost/regex.hpp>
#include <iostream>
#include <iomanip>

namespace bp = boost::process;
using boost::system::error_code;

namespace /*file-static*/ {
    using namespace std::chrono_literals;
    static auto       now = std::chrono::steady_clock::now;
    static const auto t0  = now();

    static auto timestamp() {
        return std::to_string((now() - t0) / 1.s) + "s ";
    }
} // namespace

int main() {
    boost::asio::io_context io;
    bp::async_pipe pipe(io);

    auto on_exit = [](int code, std::error_code ec) {
        std::cout << timestamp() << "on_exit: " << ec.message() << " code "
                  << code << std::endl;
    };

    bp::child c( //
        "/usr/bin/find",
        std::vector<std::string>{"/usr/local/include/boost", "-name",
                                 "*.hpp", "-exec", "cat", "{}", "+"},
        bp::std_out > pipe,   //
        bp::on_exit(on_exit), //
        io);

    std::string text;
    auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte

    size_t total_received =0;
    boost::regex const re(R"(class\s*\w+_heap)");

    std::function<void()> wait_for_message;
    wait_for_message = [&] {
        async_read_until(                         //
            pipe, buf, re,                        //
            [&](error_code ec, size_t received) { //
                std::cerr << '\x0d' << timestamp() << "Checking for message ("
                          << ec.message() << ", total " << total_received
                          << ")                ";

                if (received || ec != boost::asio::error::not_found) {
                    total_received += received;
                    buf.consume(received);

                    boost::smatch m;
                    if (regex_search(text, m, re)) {
                        std::cout << "\n" << timestamp()
                                  << "Found: " << std::quoted(m.str()) << " at "
                                  << (total_received - m.size()) << " bytes"
                                  << std::endl;
                    }
                } else {
                    // discard 90% of buffer capacity
                    auto discard =
                        std::min(buf.max_size() / 10 * 9, buf.size());
                    total_received += discard;
                    buf.consume(discard);
                }

                if (!ec | (ec == boost::asio::error::not_found))
                    wait_for_message();
                else
                    std::cout << "\n" << timestamp() << ec.message() << std::endl;
            });
    };

    wait_for_message();
    io.run();

    std::cout << timestamp() << " - Done, total_received: " << total_received << "\n";
}

打印

2.033324s Found: "class d_ary_heap" at 6747512 bytes
2.065290s Found: "class pairing_heap" at 6831390 bytes
2.071888s Found: "class binomial_heap" at 6860833 bytes
2.072715s Found: "class skew_heap" at 6895677 bytes
2.073348s Found: "class fibonacci_heap" at 6921559 bytes
34.729355s End of file
34.730515s on_exit: Success code 0
34.730593s  - Done, total_received: 154746011

或者在我的机器上直播: