使用 boost.process 同时读取和写入 child 的 stdio
simultaneous read and write to child's stdio using boost.process
我正在尝试使用 boost.process 写入和读取 child 的 stdio,方法如下:
boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};
bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
in.async_write_some(
boost::asio::buffer(src.data(), read),
[](const boost::system::error_code &e, std::size_t) { });
// written data is not important, that's why using same buffer
out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
[&](const boost::system::error_code &e,
std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();
所有读取和写入操作都通知为成功,但 totalWrite 的值完全不正确 f.e 报告 29356032,实际值应该在 50000000 左右
我注意到程序在中途终止,
使用 process.wait() 在 readService.run() 冻结 child,
使用原子 int产生相同的行为
现在我实际上只需要知道实际写入了多少数据,这就是为什么我使用相同的缓冲区
这个模式:
while (callback(CallbackActions::NeedMoreInput, src, read)) {
in.async_write_some(...);
out.async_read_some(...);
}
很可能被误导了(异步操作总是立即 return,因此您只需继续添加更多异步操作而不给它们机会 运行)。
同样被误导的事实是,您为管道提供了单独的服务,但您 运行 将它们完全排除在外,因此不会有任何读取操作 运行直到 writeService 完成。
atomic
类型被误导了,因为没有来自多个线程的访问
你想做什么?您保留了一个大缓冲区,但从未将任何数据放入其中 (reserve
!= resize
)。所以只能寄希望于什么都不写了。
更讽刺的是,您正在读入完全相同的缓冲区,在完全相同的位置。然而,这立即 Undefined Behaviour¹ 因为当你知道 src.size()==0
.
时你传递了它 src.capacity()
即使没有那个错误,你怎么能"simultaneously"从内存中读取和写入完全相同的字节并且仍然知道预期的结果是什么?
您没有将自己的 io_service
传递给 Boost Process
一个工作演示
这是一个工作示例。当然,我不得不猜测你到底想做什么。
我选择让程序将自己的源 (main.cpp) 发送到标准输入,并迭代读取标准输出,记录 total_received
字节。然后它打印退出代码和总数。
作为临时压缩器,我使用了 '/usr/bin/xxd'
,因为它可用,甚至可以有用地打印到 std::cout
以进行调试。
Live On Coliru // Coliru 上的麻烦
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);
namespace bp = boost::process;
using boost::system::error_code;
using Loop = boost::function<void()>;
using Buffer = std::array<char, 4*1024>;
int main() {
boost::asio::io_service svc;
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);
auto data = read_file("main.cpp");
Loop read_loop, write_loop;
Buffer recv_buffer;
std::size_t total_received = 0;
read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
out.async_read_some(boost::asio::buffer(recv_buffer),
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
total_received += transferred;
if (!ec)
read_loop(); // continue reading
});
};
boost::asio::async_write(in, boost::asio::buffer(data),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
in.close(ec);
std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
}); // async
read_loop(); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}
#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
std::ifstream ifs(fname);
return {std::istreambuf_iterator<char>(ifs), {}};
}
打印
WriteLoop: Success done, 1787 bytes
WriteLoop: closed pipe (Success)
ReadLoop: Success got 4096 bytes
ReadLoop: Success got 3515 bytes
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=7611
解释、简化
请注意,我们在没有循环的情况下完成了所有的写作。那是因为 boost::asio::async_write
是一个 组合操作 (它隐藏了循环)。
同样,如果您可以"afford"将接收到的全部数据存储在内存中,您可以使用boost::asio::streambuf
并使用类似的组合操作来简化:
Live On Coliru // Coliru 上的麻烦
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);
namespace bp = boost::process;
using boost::system::error_code;
int main() {
boost::asio::io_service svc;
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);
auto data = read_file("main.cpp");
boost::asio::streambuf recv_buffer;
boost::asio::async_read(out, recv_buffer,
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
});
boost::asio::async_write(in, boost::asio::buffer(data),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
in.close(ec);
std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
}); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "\n";
}
#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
std::ifstream ifs(fname);
return {std::istreambuf_iterator<char>(ifs), {}};
}
Conversely, if you cannot afford to have all the data in memory before sending, you can create a loop to send input block-wise
两个异步循环,有延迟
让我们这样做,在写入每个块之前延迟一秒钟,让它更有趣。由于延迟,您希望看到的是 reads/writes 交替发生:
Live On Coliru // 是的 运行 在 Coliru
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
#include <fstream>
namespace bp = boost::process;
using boost::system::error_code;
using namespace std::chrono_literals;
using Loop = boost::function<void()>;
using Buffer = std::array<char, 500>;
int main() {
boost::asio::io_service svc;
auto on_exit = [](int code, std::error_code ec) {
std::cout << "Exited " << code << " (" << ec.message() << ")\n";
};
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));
Loop read_loop, write_loop;
Buffer recv_buffer;
std::size_t total_received = 0;
read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
out.async_read_some(boost::asio::buffer(recv_buffer),
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
total_received += transferred;
if (!ec)
read_loop(); // continue reading
});
};
std::ifstream ifs("main.cpp");
std::size_t total_written = 0;
Buffer send_buffer;
boost::asio::high_resolution_timer send_delay(svc);
write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {
if (!ifs.good())
{
error_code ec;
in.close(ec);
std::cout << "WriteLoop: closed stdin (" << ec.message() << ")\n";
return;
}
ifs.read(send_buffer.data(), send_buffer.size());
boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes\n";
total_written += transferred;
if (!ec) {
send_delay.expires_from_now(1s);
send_delay.async_wait([&write_loop](error_code ec) {
std::cout << "WriteLoop: send delay " << ec.message() << "\n";
if (!ec) write_loop(); // continue writing
});
}
});
};
read_loop(); // async
write_loop(); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}
版画
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 134 bytes
WriteLoop: send delay Success
WriteLoop: closed stdin (Success)
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 22 bytes
Exited 0 (Success)
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=11214
¹ 也许只是 未指定,我现在不想找出区别
我正在尝试使用 boost.process 写入和读取 child 的 stdio,方法如下:
boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};
bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
in.async_write_some(
boost::asio::buffer(src.data(), read),
[](const boost::system::error_code &e, std::size_t) { });
// written data is not important, that's why using same buffer
out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
[&](const boost::system::error_code &e,
std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();
所有读取和写入操作都通知为成功,但 totalWrite 的值完全不正确 f.e 报告 29356032,实际值应该在 50000000 左右
我注意到程序在中途终止,
使用 process.wait() 在 readService.run() 冻结 child,
使用原子 int产生相同的行为
现在我实际上只需要知道实际写入了多少数据,这就是为什么我使用相同的缓冲区
这个模式:
while (callback(CallbackActions::NeedMoreInput, src, read)) { in.async_write_some(...); out.async_read_some(...); }
很可能被误导了(异步操作总是立即 return,因此您只需继续添加更多异步操作而不给它们机会 运行)。
同样被误导的事实是,您为管道提供了单独的服务,但您 运行 将它们完全排除在外,因此不会有任何读取操作 运行直到 writeService 完成。
atomic
类型被误导了,因为没有来自多个线程的访问你想做什么?您保留了一个大缓冲区,但从未将任何数据放入其中 (
reserve
!=resize
)。所以只能寄希望于什么都不写了。更讽刺的是,您正在读入完全相同的缓冲区,在完全相同的位置。然而,这立即 Undefined Behaviour¹ 因为当你知道
时你传递了它src.size()==0
.src.capacity()
即使没有那个错误,你怎么能"simultaneously"从内存中读取和写入完全相同的字节并且仍然知道预期的结果是什么?
您没有将自己的
io_service
传递给 Boost Process
一个工作演示
这是一个工作示例。当然,我不得不猜测你到底想做什么。
我选择让程序将自己的源 (main.cpp) 发送到标准输入,并迭代读取标准输出,记录 total_received
字节。然后它打印退出代码和总数。
作为临时压缩器,我使用了 '/usr/bin/xxd'
,因为它可用,甚至可以有用地打印到 std::cout
以进行调试。
Live On Coliru // Coliru 上的麻烦
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);
namespace bp = boost::process;
using boost::system::error_code;
using Loop = boost::function<void()>;
using Buffer = std::array<char, 4*1024>;
int main() {
boost::asio::io_service svc;
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);
auto data = read_file("main.cpp");
Loop read_loop, write_loop;
Buffer recv_buffer;
std::size_t total_received = 0;
read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
out.async_read_some(boost::asio::buffer(recv_buffer),
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
total_received += transferred;
if (!ec)
read_loop(); // continue reading
});
};
boost::asio::async_write(in, boost::asio::buffer(data),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
in.close(ec);
std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
}); // async
read_loop(); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}
#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
std::ifstream ifs(fname);
return {std::istreambuf_iterator<char>(ifs), {}};
}
打印
WriteLoop: Success done, 1787 bytes
WriteLoop: closed pipe (Success)
ReadLoop: Success got 4096 bytes
ReadLoop: Success got 3515 bytes
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=7611
解释、简化
请注意,我们在没有循环的情况下完成了所有的写作。那是因为 boost::asio::async_write
是一个 组合操作 (它隐藏了循环)。
同样,如果您可以"afford"将接收到的全部数据存储在内存中,您可以使用boost::asio::streambuf
并使用类似的组合操作来简化:
Live On Coliru // Coliru 上的麻烦
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);
namespace bp = boost::process;
using boost::system::error_code;
int main() {
boost::asio::io_service svc;
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);
auto data = read_file("main.cpp");
boost::asio::streambuf recv_buffer;
boost::asio::async_read(out, recv_buffer,
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
});
boost::asio::async_write(in, boost::asio::buffer(data),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
in.close(ec);
std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
}); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "\n";
}
#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
std::ifstream ifs(fname);
return {std::istreambuf_iterator<char>(ifs), {}};
}
Conversely, if you cannot afford to have all the data in memory before sending, you can create a loop to send input block-wise
两个异步循环,有延迟
让我们这样做,在写入每个块之前延迟一秒钟,让它更有趣。由于延迟,您希望看到的是 reads/writes 交替发生:
Live On Coliru // 是的 运行 在 Coliru
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
#include <fstream>
namespace bp = boost::process;
using boost::system::error_code;
using namespace std::chrono_literals;
using Loop = boost::function<void()>;
using Buffer = std::array<char, 500>;
int main() {
boost::asio::io_service svc;
auto on_exit = [](int code, std::error_code ec) {
std::cout << "Exited " << code << " (" << ec.message() << ")\n";
};
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));
Loop read_loop, write_loop;
Buffer recv_buffer;
std::size_t total_received = 0;
read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
out.async_read_some(boost::asio::buffer(recv_buffer),
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
total_received += transferred;
if (!ec)
read_loop(); // continue reading
});
};
std::ifstream ifs("main.cpp");
std::size_t total_written = 0;
Buffer send_buffer;
boost::asio::high_resolution_timer send_delay(svc);
write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {
if (!ifs.good())
{
error_code ec;
in.close(ec);
std::cout << "WriteLoop: closed stdin (" << ec.message() << ")\n";
return;
}
ifs.read(send_buffer.data(), send_buffer.size());
boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes\n";
total_written += transferred;
if (!ec) {
send_delay.expires_from_now(1s);
send_delay.async_wait([&write_loop](error_code ec) {
std::cout << "WriteLoop: send delay " << ec.message() << "\n";
if (!ec) write_loop(); // continue writing
});
}
});
};
read_loop(); // async
write_loop(); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}
版画
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 134 bytes
WriteLoop: send delay Success
WriteLoop: closed stdin (Success)
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 22 bytes
Exited 0 (Success)
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=11214
¹ 也许只是 未指定,我现在不想找出区别