Boost async_pipe 未显示所有 child 过程输出
Boost async_pipe not showing all child process output
我运行进了路障。下面的代码有问题,但这只是一个演示;我想先搞清楚高层逻辑。
这两个启动应用程序在到达“就绪”状态之前输出了很多启动信息。在此状态下,程序 A 已准备好通过标准输入接受用户输入。程序B只是通过网络连接监听--摄取和记录数据。
理想情况下,使用这个示例程序,我应该能够在“real-time”中看到程序 B 的输出。但是在每次循环迭代中,什么也没有发生;我不确定它是否通过管道接收输入。
我之前使用 bp::opstream
写入 child 的--程序 A--stdin。我知道程序 A 是否通过其 async_pipe 接受了某些命令,程序 B show 也会显示一些日志信息(例如“trip”)。这些是 window 控制台应用程序,我正在使用 Boost C++ 作为 child 进程与它们交互。
有人知道发生了什么事吗?
std::size_t read_loop(bp::async_pipe& p, mutable_buffer buf, boost::system::error_code &err)
{
return p.read_some(buf, err);
}
void read_loop_async(bp::async_pipe& p, mutable_buffer buf, std::error_code &err) {
p.async_read_some(buf, [&p, buf, &err](std::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
err = ec;
if (!ec)
read_loop_async(p, buf, err);
});
}
void write_pipe(bp::async_pipe&p, mutable_buffer buf)
{
ba::async_write(p, buf, [](boost::system::error_code ec, std::size_t sz)
{
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
});
}
int main()
{
bp::opstream sendToChild;
string wd = "<---path-to-working-dir----->";
ba::io_service ios;
string bin = "<path-to-bin-and-name>";
bp::async_pipe input_pipe(ios);
bp::async_pipe output_pipe(ios);
bp::child c(bin, "arg1", "arg2", "arg3", bp::std_out > output_pipe,
bp::std_in < input_pipe, ios, bp::start_dir(wd.c_str()));
size_t size = 8192;
string input;
vector <char> buffer(size);
boost::system::error_code ec;
std::error_code err;
ios.run();
while (1)
{
//show read whatever is available from the childs output_pipe
read_loop_async(output_pipe, bp::buffer(buffer), err);
cout << "\nBoot-> ";
cin >> input;
if (input == "1")
{
cout << " send input to child: ";
cin >> input;
//send commands to the child, Program A
//originally
//sendToChild << input<< endl;
write_pipe(input_pipe, bp::buffer(input));
}
if (input == "quit")
{
//sendToChild << input << endl;
read_loop_async(output_pipe, bp::buffer(buffer), err);
break;
}
ios.poll(ec);
ios.restart();
}
c.join();
cout << "done...";
cin >> input;
}
这是我关注的link:
嗯。有很多东西要打开。首先:
ios.run();
运行直到 child 完成。如果 child 进程需要发送的输出超过缓冲区的容量,那么它很可能会死锁,因为在执行 ios.run()
.
之前你不会消耗任何输出
根据定义,下一个 poll()
不执行任何操作,因为您没有先调用 restart
。幸运的是,您忽略了错误代码,接下来会发生 restart
。
然后,你会遇到下一个问题,因为循环的下一次迭代以 another read_loop_async(output_pipe, bp::buffer(buffer), err);
开始,这意味着你有重叠的读取操作,这通常是被禁止的( Undefined Behaviour),但无论如何都会遇到 UB,因为您使用的是相同的缓冲区。
这本身足以解释“丢失数据”,因为,是的,您在同一位置进行多次读取,所以一个会破坏另一个。也就是说,如果你可以推理它,因为你不能推理UB。
奇怪的是,现在我的眼睛甚至发现了 read_loop_async
的第三次调用。这没有道理。顾名思义,read_loop_async
已经是一个循环:它在完成时调用自己:
if (!ec)
read_loop_async(p, buf, err);
因此,预计只会调用 1 次。您似乎没有理解 async_*
启动功能总是 return 立即(因为操作完成 异步 )。这也体现在您分配的事实中:
err = ec;
其中 err
是启动函数的引用参数。它不是那样工作的。该错误仅在完成时可用。由于您似乎无论如何都不会在读取循环之外使用它,所以我会放弃它。
然后是
sendToChild << input << std::endl;
什么都不做,因为 sendToChild
只是字面上的声明,从未在其他地方使用过。
write_pipe
再次 尝试 使用 async_
启动,但它不能,因为它正在同步输入循环中使用。只是不要在那里使用异步。正如所写,它是 UB 的另一个来源,因为 buf
参数将指向一个 std::string
变量,该变量在 main 函数中被改变。所以,简化:
void write_pipe(bp::async_pipe& p, const_buffer buf) {
error_code ec;
auto sz = write(p, buf, ec);
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}
[注意它如何正确地将 buf
标记为 const_buffer
。]
现在,可能修复 sendToChild
被
使用的问题
- 同时关闭管道(向 child 发送 EOF 信号)
- 打破输入循环
if (input == "quit") {
write_pipe(input_pipe, bp::buffer(input + "\n"));
input_pipe.close();
break;
}
我会用 poll()
替换 ios.restart()
- 因为我们没有 run()
太早了。
除上述之外,我将 operator>>
替换为 std::getline
调用,因为您很可能希望用户输入用 Enter 键分隔,而不是 space。我还像在 sendToChild
行中一样添加了 "\n"
,因为它有助于通过使用 line-buffered 输入的简单测试 child 进行演示。
现在,我们将以此作为测试 child:
bp::child c(bin, "-c",
"time while read line; do echo \"$line\" | rev | xxd; done", //
bp::std_out > output_pipe,
bp::std_in < input_pipe, //
ios, //
bp::start_dir(wd));
这意味着我们的输入在反向和十六进制转储中得到回显,最后是时间摘要。
有点固定列表
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::asio::const_buffer;
using boost::asio::mutable_buffer;
using boost::system::error_code;
void read_loop_async(bp::async_pipe& p, mutable_buffer buf) {
p.async_read_some(buf, [&p, buf](std::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
if (!ec)
read_loop_async(p, buf);
});
}
void write_pipe(bp::async_pipe& p, const_buffer buf) {
error_code ec;
auto sz = write(p, buf, ec);
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}
int main() {
std::string wd = "/home/sehe/Projects/Whosebug";
boost::asio::io_service ios;
std::string bin = "/bin/bash";
bp::async_pipe input_pipe(ios);
bp::async_pipe output_pipe(ios);
bp::child c(bin, "-c",
"while read line; do echo \"$line\" | rev | xxd; done", //
bp::std_out > output_pipe,
bp::std_in < input_pipe, //
ios, //
bp::start_dir(wd));
// Single invocation!
std::vector<char> buffer(8192);
read_loop_async(output_pipe, bp::buffer(buffer));
std::cout << "\nBoot-> ";
for (std::string input; getline(std::cin, input);
std::cout << "\nBoot-> ") {
if (input == "1") {
std::cout << " send input to child: ";
if (getline(std::cin, input)) {
write_pipe(input_pipe, bp::buffer(input + "\n"));
}
}
if (input == "quit") {
write_pipe(input_pipe, bp::buffer(input + "\n"));
input_pipe.close();
break;
}
ios.poll();
}
ios.run(); // effectively like `c.wait();` but async
std::cout << "done...";
// ignore until line end
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
测试
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -lboost_{system,filesystem} && ./a.out <<HERE
1
Hello world
Ignored
1
Bye world
quit
HERE
打印
Boot-> send input to child: Size Written 12 Ec: system:0 Success
Boot->
Boot-> send input to child: Size Written 10 Ec: system:0 Success
Boot-> Size Written 5 Ec: system:0 Success
Received 64 bytes (Success): '00000000: 646c 726f 7720 6f6c 6c65 480a dlrow olleH.
Received 62 bytes (Success): '00000000: 646c 726f 7720 6579 420a dlrow eyB.
Received 57 bytes (Success): '00000000: 7469 7571 0a tiuq.
Received 0 bytes (End of file): '
done...
在我的系统上哪个更容易以交互方式跟随:
我运行进了路障。下面的代码有问题,但这只是一个演示;我想先搞清楚高层逻辑。
这两个启动应用程序在到达“就绪”状态之前输出了很多启动信息。在此状态下,程序 A 已准备好通过标准输入接受用户输入。程序B只是通过网络连接监听--摄取和记录数据。
理想情况下,使用这个示例程序,我应该能够在“real-time”中看到程序 B 的输出。但是在每次循环迭代中,什么也没有发生;我不确定它是否通过管道接收输入。
我之前使用 bp::opstream
写入 child 的--程序 A--stdin。我知道程序 A 是否通过其 async_pipe 接受了某些命令,程序 B show 也会显示一些日志信息(例如“trip”)。这些是 window 控制台应用程序,我正在使用 Boost C++ 作为 child 进程与它们交互。
有人知道发生了什么事吗?
std::size_t read_loop(bp::async_pipe& p, mutable_buffer buf, boost::system::error_code &err)
{
return p.read_some(buf, err);
}
void read_loop_async(bp::async_pipe& p, mutable_buffer buf, std::error_code &err) {
p.async_read_some(buf, [&p, buf, &err](std::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
err = ec;
if (!ec)
read_loop_async(p, buf, err);
});
}
void write_pipe(bp::async_pipe&p, mutable_buffer buf)
{
ba::async_write(p, buf, [](boost::system::error_code ec, std::size_t sz)
{
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
});
}
int main()
{
bp::opstream sendToChild;
string wd = "<---path-to-working-dir----->";
ba::io_service ios;
string bin = "<path-to-bin-and-name>";
bp::async_pipe input_pipe(ios);
bp::async_pipe output_pipe(ios);
bp::child c(bin, "arg1", "arg2", "arg3", bp::std_out > output_pipe,
bp::std_in < input_pipe, ios, bp::start_dir(wd.c_str()));
size_t size = 8192;
string input;
vector <char> buffer(size);
boost::system::error_code ec;
std::error_code err;
ios.run();
while (1)
{
//show read whatever is available from the childs output_pipe
read_loop_async(output_pipe, bp::buffer(buffer), err);
cout << "\nBoot-> ";
cin >> input;
if (input == "1")
{
cout << " send input to child: ";
cin >> input;
//send commands to the child, Program A
//originally
//sendToChild << input<< endl;
write_pipe(input_pipe, bp::buffer(input));
}
if (input == "quit")
{
//sendToChild << input << endl;
read_loop_async(output_pipe, bp::buffer(buffer), err);
break;
}
ios.poll(ec);
ios.restart();
}
c.join();
cout << "done...";
cin >> input;
}
这是我关注的link:
嗯。有很多东西要打开。首先:
ios.run();
运行直到 child 完成。如果 child 进程需要发送的输出超过缓冲区的容量,那么它很可能会死锁,因为在执行 ios.run()
.
根据定义,下一个 poll()
不执行任何操作,因为您没有先调用 restart
。幸运的是,您忽略了错误代码,接下来会发生 restart
。
然后,你会遇到下一个问题,因为循环的下一次迭代以 another read_loop_async(output_pipe, bp::buffer(buffer), err);
开始,这意味着你有重叠的读取操作,这通常是被禁止的( Undefined Behaviour),但无论如何都会遇到 UB,因为您使用的是相同的缓冲区。
这本身足以解释“丢失数据”,因为,是的,您在同一位置进行多次读取,所以一个会破坏另一个。也就是说,如果你可以推理它,因为你不能推理UB。
奇怪的是,现在我的眼睛甚至发现了 read_loop_async
的第三次调用。这没有道理。顾名思义,read_loop_async
已经是一个循环:它在完成时调用自己:
if (!ec)
read_loop_async(p, buf, err);
因此,预计只会调用 1 次。您似乎没有理解 async_*
启动功能总是 return 立即(因为操作完成 异步 )。这也体现在您分配的事实中:
err = ec;
其中 err
是启动函数的引用参数。它不是那样工作的。该错误仅在完成时可用。由于您似乎无论如何都不会在读取循环之外使用它,所以我会放弃它。
然后是
sendToChild << input << std::endl;
什么都不做,因为 sendToChild
只是字面上的声明,从未在其他地方使用过。
write_pipe
再次 尝试 使用 async_
启动,但它不能,因为它正在同步输入循环中使用。只是不要在那里使用异步。正如所写,它是 UB 的另一个来源,因为 buf
参数将指向一个 std::string
变量,该变量在 main 函数中被改变。所以,简化:
void write_pipe(bp::async_pipe& p, const_buffer buf) {
error_code ec;
auto sz = write(p, buf, ec);
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}
[注意它如何正确地将 buf
标记为 const_buffer
。]
现在,可能修复 sendToChild
被
- 同时关闭管道(向 child 发送 EOF 信号)
- 打破输入循环
if (input == "quit") {
write_pipe(input_pipe, bp::buffer(input + "\n"));
input_pipe.close();
break;
}
我会用 poll()
替换 ios.restart()
- 因为我们没有 run()
太早了。
除上述之外,我将 operator>>
替换为 std::getline
调用,因为您很可能希望用户输入用 Enter 键分隔,而不是 space。我还像在 sendToChild
行中一样添加了 "\n"
,因为它有助于通过使用 line-buffered 输入的简单测试 child 进行演示。
现在,我们将以此作为测试 child:
bp::child c(bin, "-c",
"time while read line; do echo \"$line\" | rev | xxd; done", //
bp::std_out > output_pipe,
bp::std_in < input_pipe, //
ios, //
bp::start_dir(wd));
这意味着我们的输入在反向和十六进制转储中得到回显,最后是时间摘要。
有点固定列表
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::asio::const_buffer;
using boost::asio::mutable_buffer;
using boost::system::error_code;
void read_loop_async(bp::async_pipe& p, mutable_buffer buf) {
p.async_read_some(buf, [&p, buf](std::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
if (!ec)
read_loop_async(p, buf);
});
}
void write_pipe(bp::async_pipe& p, const_buffer buf) {
error_code ec;
auto sz = write(p, buf, ec);
std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}
int main() {
std::string wd = "/home/sehe/Projects/Whosebug";
boost::asio::io_service ios;
std::string bin = "/bin/bash";
bp::async_pipe input_pipe(ios);
bp::async_pipe output_pipe(ios);
bp::child c(bin, "-c",
"while read line; do echo \"$line\" | rev | xxd; done", //
bp::std_out > output_pipe,
bp::std_in < input_pipe, //
ios, //
bp::start_dir(wd));
// Single invocation!
std::vector<char> buffer(8192);
read_loop_async(output_pipe, bp::buffer(buffer));
std::cout << "\nBoot-> ";
for (std::string input; getline(std::cin, input);
std::cout << "\nBoot-> ") {
if (input == "1") {
std::cout << " send input to child: ";
if (getline(std::cin, input)) {
write_pipe(input_pipe, bp::buffer(input + "\n"));
}
}
if (input == "quit") {
write_pipe(input_pipe, bp::buffer(input + "\n"));
input_pipe.close();
break;
}
ios.poll();
}
ios.run(); // effectively like `c.wait();` but async
std::cout << "done...";
// ignore until line end
std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
测试
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -lboost_{system,filesystem} && ./a.out <<HERE
1
Hello world
Ignored
1
Bye world
quit
HERE
打印
Boot-> send input to child: Size Written 12 Ec: system:0 Success
Boot->
Boot-> send input to child: Size Written 10 Ec: system:0 Success
Boot-> Size Written 5 Ec: system:0 Success
Received 64 bytes (Success): '00000000: 646c 726f 7720 6f6c 6c65 480a dlrow olleH.
Received 62 bytes (Success): '00000000: 646c 726f 7720 6579 420a dlrow eyB.
Received 57 bytes (Success): '00000000: 7469 7571 0a tiuq.
Received 0 bytes (End of file): '
done...
在我的系统上哪个更容易以交互方式跟随: