Boost async_pipe 未显示所有 child 过程输出

Boost async_pipe not showing all child process output

我运行进了路障。下面的代码有问题,但这只是一个演示;我想先搞清楚高层逻辑。

这两个启动应用程序在到达“就绪”状态之前输出了很多启动信息。在此状态下,程序 A 已准备好通过标准输入接受用户输入。程序B只是通过网络连接监听--摄取和记录数据。

理想情况下,使用这个示例程序,我应该能够在“real-time”中看到程序 B 的输出。但是在每次循环迭代中,什么也没有发生;我不确定它是否通过管道接收输入。

我之前使用 bp::opstream 写入 child 的--程序 A--stdin。我知道程序 A 是否通过其 async_pipe 接受了某些命令,程序 B show 也会显示一些日志信息(例如“trip”)。这些是 window 控制台应用程序,我正在使用 Boost C++ 作为 child 进程与它们交互。

有人知道发生了什么事吗?



std::size_t read_loop(bp::async_pipe& p, mutable_buffer buf, boost::system::error_code &err)
{
    return p.read_some(buf, err);


}



void read_loop_async(bp::async_pipe& p, mutable_buffer buf, std::error_code &err) {
    p.async_read_some(buf, [&p, buf, &err](std::error_code ec, size_t n) {
        std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
        std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
        err = ec;

        if (!ec)
            read_loop_async(p, buf, err);

    });
}


void write_pipe(bp::async_pipe&p, mutable_buffer buf)
{
    ba::async_write(p, buf, [](boost::system::error_code ec, std::size_t sz)
    {
        std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
    });

}


int main()
{

    bp::opstream sendToChild;
    string wd = "<---path-to-working-dir----->";
    ba::io_service ios;
    string bin = "<path-to-bin-and-name>";


    bp::async_pipe input_pipe(ios);
    bp::async_pipe output_pipe(ios);

    bp::child c(bin, "arg1", "arg2", "arg3", bp::std_out > output_pipe,
        bp::std_in < input_pipe, ios, bp::start_dir(wd.c_str()));

    size_t size = 8192;
    string input;

    vector <char> buffer(size);
    boost::system::error_code ec;

    std::error_code err;

    ios.run();
    while (1)
    {
        //show read whatever is available from the childs output_pipe
        read_loop_async(output_pipe, bp::buffer(buffer), err);



        cout << "\nBoot-> ";
        cin >> input;
        if (input == "1")
        {
            cout << "   send input to child: ";
            cin >> input;
            //send commands to the child, Program A
            //originally
            //sendToChild << input<< endl;
            write_pipe(input_pipe, bp::buffer(input));
        }
        if (input == "quit")
        {
            //sendToChild << input << endl;
            read_loop_async(output_pipe, bp::buffer(buffer), err);
            break;
        }

        ios.poll(ec);
        ios.restart();

    }

    c.join();
    cout << "done...";
    cin >> input;
}



这是我关注的link:

嗯。有很多东西要打开。首先:

ios.run();

运行直到 child 完成。如果 child 进程需要发送的输出超过缓冲区的容量,那么它很可能会死锁,因为在执行 ios.run().

之前你不会消耗任何输出

根据定义,下一个 poll() 不执行任何操作,因为您没有先调用 restart。幸运的是,您忽略了错误代码,接下来会发生 restart

然后,你会遇到下一个问题,因为循环的下一次迭代以 another read_loop_async(output_pipe, bp::buffer(buffer), err); 开始,这意味着你有重叠的读取操作,这通常是被禁止的( Undefined Behaviour),但无论如何都会遇到 UB,因为您使用的是相同的缓冲区。

这本身足以解释“丢失数据”,因为,是的,您在同一位置进行多次读取,所以一个会破坏另一个。也就是说,如果你可以推理它,因为你不能推理UB。

奇怪的是,现在我的眼睛甚至发现了 read_loop_async 的第三次调用。这没有道理。顾名思义,read_loop_async已经是一个循环:它在完成时调用自己:

    if (!ec)
        read_loop_async(p, buf, err);

因此,预计只会调用 1 次。您似乎没有理解 async_* 启动功能总是 return 立即(因为操作完成 异步 )。这也体现在您分配的事实中:

    err = ec;

其中 err 是启动函数的引用参数。它不是那样工作的。该错误仅在完成时可用。由于您似乎无论如何都不会在读取循环之外使用它,所以我会放弃它。

然后是

        sendToChild << input << std::endl;

什么都不做,因为 sendToChild 只是字面上的声明,从未在其他地方使用过。

write_pipe 再次 尝试 使用 async_ 启动,但它不能,因为它正在同步输入循环中使用。只是不要在那里使用异步。正如所写,它是 UB 的另一个来源,因为 buf 参数将指向一个 std::string 变量,该变量在 main 函数中被改变。所以,简化:

void write_pipe(bp::async_pipe& p, const_buffer buf) {
    error_code ec;
    auto       sz = write(p, buf, ec);
    std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}

[注意它如何正确地将 buf 标记为 const_buffer。]

现在,可能修复 sendToChild

使用的问题
  • 同时关闭管道(向 child 发送 EOF 信号)
  • 打破输入循环
    if (input == "quit") {
        write_pipe(input_pipe, bp::buffer(input + "\n"));
        input_pipe.close();
        break;
    }

我会用 poll() 替换 ios.restart() - 因为我们没有 run() 太早了。

除上述之外,我将 operator>> 替换为 std::getline 调用,因为您很可能希望用户输入用 Enter 键分隔,而不是 space。我还像在 sendToChild 行中一样添加了 "\n",因为它有助于通过使用 line-buffered 输入的简单测试 child 进行演示。

现在,我们将以此作为测试 child:

bp::child c(bin, "-c",
            "time while read line; do echo \"$line\" | rev | xxd; done", //
            bp::std_out > output_pipe,
            bp::std_in < input_pipe, //
            ios,                     //
            bp::start_dir(wd));

这意味着我们的输入在反向和十六进制转储中得到回显,最后是时间摘要。

有点固定列表

Live On Coliru

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::asio::const_buffer;
using boost::asio::mutable_buffer;
using boost::system::error_code;

void read_loop_async(bp::async_pipe& p, mutable_buffer buf) {
    p.async_read_some(buf, [&p, buf](std::error_code ec, size_t n) {
        std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
        std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;

        if (!ec)
            read_loop_async(p, buf);
    });
}

void write_pipe(bp::async_pipe& p, const_buffer buf) {
    error_code ec;
    auto       sz = write(p, buf, ec);
    std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}

int main() {
    std::string wd = "/home/sehe/Projects/Whosebug";
    boost::asio::io_service ios;
    std::string bin = "/bin/bash";


    bp::async_pipe input_pipe(ios);
    bp::async_pipe output_pipe(ios);

    bp::child c(bin, "-c",
                "while read line; do echo \"$line\" | rev | xxd; done", //
                bp::std_out > output_pipe,
                bp::std_in < input_pipe, //
                ios,                     //
                bp::start_dir(wd));


    // Single invocation!
    std::vector<char> buffer(8192);
    read_loop_async(output_pipe, bp::buffer(buffer));

    std::cout << "\nBoot-> ";
    for (std::string input; getline(std::cin, input);
         std::cout << "\nBoot-> ") {
        if (input == "1") {
            std::cout << "   send input to child: ";
            if (getline(std::cin, input)) {
                write_pipe(input_pipe, bp::buffer(input + "\n"));
            }
        }
        if (input == "quit") {
            write_pipe(input_pipe, bp::buffer(input + "\n"));
            input_pipe.close();
            break;
        }

        ios.poll();
    }

    ios.run(); // effectively like `c.wait();` but async

    std::cout << "done...";
    // ignore until line end
    std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}

测试

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -lboost_{system,filesystem} && ./a.out <<HERE
1
Hello world
Ignored
1
Bye world
quit
HERE

打印

Boot->    send input to child: Size Written 12 Ec: system:0 Success

Boot-> 
Boot->    send input to child: Size Written 10 Ec: system:0 Success

Boot-> Size Written 5 Ec: system:0 Success
Received 64 bytes (Success): '00000000: 646c 726f 7720 6f6c 6c65 480a            dlrow olleH.

Received 62 bytes (Success): '00000000: 646c 726f 7720 6579 420a                 dlrow eyB.

Received 57 bytes (Success): '00000000: 7469 7571 0a                             tiuq.

Received 0 bytes (End of file): '
done...

在我的系统上哪个更容易以交互方式跟随: