进程为 运行 时是否可以使用进程输出?
Is is possible to use process output while process is running?
Boost.process 允许使用 Boost.asio 来执行异步读取。
据我了解,这对于在进程 运行ning 期间读取输出很有用,而无需等待进程终止。
但是要访问此输出,是否需要等待进程终止,或者是否可以在进程 运行ning 期间访问它,如何访问?
实际上我的需要是访问进程输出的开头(检查它是否按预期启动),同时保持它 运行ning.
为了详细说明上下文,我运行一个我想保留到执行结束的过程:
boost::asio::io_service ios;
std::vector<char> buf;
bp::child c("process_that_needs_to_keep_running", args,
bp::std_out > boost::asio::buffer(buf), ios);
ios.run();
// I DON'T WANT WAIT FOR c TO TERMINATE
// but I want to check that buf contains some text that ensures me it started correctly
// the issue I have here is that I don't know how to read from buf, since its size and content might not be consistent
// is it possible to take a snapshot for instance?
check_started_correctly(buf);
这里的问题是生产者创建了我无法控制的输出,我只是发布输出。
如果您使用 bp::std_out > some_kind_of_buffer_or_future
,您通常只会在退出时得到结果。
但是,您可以使用 async_pipe
:
bp::async_pipe pipe(io);
bp::child c( //
"/bin/bash",
std::vector<std::string>{
"-c",
"for a in {1..20}; do sleep 1; echo message $a; done",
}, //
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
现在,您必须显式在该管道上执行 IO:
boost::asio::streambuf sb;
async_read_until( //
pipe, sb, "message 5\n", //
[&](error_code ec, size_t) { //
std::cout << "Got message 5 (" << ec.message() << ")" << std::endl;
});
这个有效:
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::system::error_code;
namespace /*file-static*/ {
using namespace std::chrono_literals;
static auto now = std::chrono::steady_clock::now;
static const auto t0 = now();
static auto timestamp() {
return std::to_string((now() - t0) / 1.s) + "s ";
}
} // namespace
int main() {
boost::asio::io_context io;
bp::async_pipe pipe(io);
auto on_exit = [](int code, std::error_code ec) {
std::cout << timestamp() << "on_exit: " << ec.message() << " code "
<< code << std::endl;
};
bp::child c( //
"/bin/bash",
std::vector<std::string>{
"-c",
"for a in {1..20}; do sleep 1; echo message $a; done",
}, //
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
boost::asio::streambuf sb;
async_read_until( //
pipe, sb, "message 5\n", //
[&](error_code ec, size_t) { //
std::cout << timestamp() << "Got message 5 (" << ec.message() << ")"
<< std::endl;
});
io.run();
}
版画
5.025400s Got message 5 (Success)
20.100547s on_exit: Success code 0
这样您就可以在需要时回复您正在寻找的内容。请记住 OS 并且 shell 在管道上进行流缓冲,但默认值为 line-buffering 因此,您可以期望在打印换行符后立即接收输入。
大缓冲区?
以上有点假设您可以缓冲整个输出直到有趣的消息。如果那是千兆字节呢?只要您的模式不是千兆字节,您就可以继续阅读,直到符合条件为止。
让我们将我们的示例变成一个异步 grep,它在所有提升 headers 中查找正则表达式 class\s*\w+_heap
。当然,这是很多兆字节的数据,但我们只使用了 10Kb 的缓冲区:
std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");
现在我们创建一个读取循环,当缓冲区已满时读取直到匹配或:
std::function<void()> wait_for_message;
wait_for_message = [&] {
async_read_until( //
pipe, buf, re, //
[&](error_code ec, size_t received) { //
std::cerr << '\x0d' << timestamp() << "Checking for message ("
<< ec.message() << ", total " << total_received
<< ") ";
if (received || ec != boost::asio::error::not_found) {
total_received += received;
buf.consume(received);
boost::smatch m;
if (regex_search(text, m, re)) {
std::cout << "\n" << timestamp()
<< "Found: " << std::quoted(m.str()) << " at "
<< (total_received - m.size()) << " bytes"
<< std::endl;
}
} else {
// discard 90% of buffer capacity
auto discard =
std::min(buf.max_size() / 10 * 9, buf.size());
total_received += discard;
buf.consume(discard);
}
if (!ec | (ec == boost::asio::error::not_found))
wait_for_message();
else
std::cout << "\n" << timestamp() << ec.message() << std::endl;
});
};
当然,这个系统可能会错过匹配 if 匹配超过缓冲区大小的 10%(因为我们只保留之前缓冲区内容的 10% 以允许匹配重叠读取边界)。
再看一遍Live On Coliru
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <boost/regex.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
using boost::system::error_code;
namespace /*file-static*/ {
using namespace std::chrono_literals;
static auto now = std::chrono::steady_clock::now;
static const auto t0 = now();
static auto timestamp() {
return std::to_string((now() - t0) / 1.s) + "s ";
}
} // namespace
int main() {
boost::asio::io_context io;
bp::async_pipe pipe(io);
auto on_exit = [](int code, std::error_code ec) {
std::cout << timestamp() << "on_exit: " << ec.message() << " code "
<< code << std::endl;
};
bp::child c( //
"/usr/bin/find",
std::vector<std::string>{"/usr/local/include/boost", "-name",
"*.hpp", "-exec", "cat", "{}", "+"},
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");
std::function<void()> wait_for_message;
wait_for_message = [&] {
async_read_until( //
pipe, buf, re, //
[&](error_code ec, size_t received) { //
std::cerr << '\x0d' << timestamp() << "Checking for message ("
<< ec.message() << ", total " << total_received
<< ") ";
if (received || ec != boost::asio::error::not_found) {
total_received += received;
buf.consume(received);
boost::smatch m;
if (regex_search(text, m, re)) {
std::cout << "\n" << timestamp()
<< "Found: " << std::quoted(m.str()) << " at "
<< (total_received - m.size()) << " bytes"
<< std::endl;
}
} else {
// discard 90% of buffer capacity
auto discard =
std::min(buf.max_size() / 10 * 9, buf.size());
total_received += discard;
buf.consume(discard);
}
if (!ec | (ec == boost::asio::error::not_found))
wait_for_message();
else
std::cout << "\n" << timestamp() << ec.message() << std::endl;
});
};
wait_for_message();
io.run();
std::cout << timestamp() << " - Done, total_received: " << total_received << "\n";
}
打印
2.033324s Found: "class d_ary_heap" at 6747512 bytes
2.065290s Found: "class pairing_heap" at 6831390 bytes
2.071888s Found: "class binomial_heap" at 6860833 bytes
2.072715s Found: "class skew_heap" at 6895677 bytes
2.073348s Found: "class fibonacci_heap" at 6921559 bytes
34.729355s End of file
34.730515s on_exit: Success code 0
34.730593s - Done, total_received: 154746011
或者在我的机器上直播:
Boost.process 允许使用 Boost.asio 来执行异步读取。
据我了解,这对于在进程 运行ning 期间读取输出很有用,而无需等待进程终止。
但是要访问此输出,是否需要等待进程终止,或者是否可以在进程 运行ning 期间访问它,如何访问?
实际上我的需要是访问进程输出的开头(检查它是否按预期启动),同时保持它 运行ning.
为了详细说明上下文,我运行一个我想保留到执行结束的过程:
boost::asio::io_service ios;
std::vector<char> buf;
bp::child c("process_that_needs_to_keep_running", args,
bp::std_out > boost::asio::buffer(buf), ios);
ios.run();
// I DON'T WANT WAIT FOR c TO TERMINATE
// but I want to check that buf contains some text that ensures me it started correctly
// the issue I have here is that I don't know how to read from buf, since its size and content might not be consistent
// is it possible to take a snapshot for instance?
check_started_correctly(buf);
这里的问题是生产者创建了我无法控制的输出,我只是发布输出。
如果您使用 bp::std_out > some_kind_of_buffer_or_future
,您通常只会在退出时得到结果。
但是,您可以使用 async_pipe
:
bp::async_pipe pipe(io);
bp::child c( //
"/bin/bash",
std::vector<std::string>{
"-c",
"for a in {1..20}; do sleep 1; echo message $a; done",
}, //
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
现在,您必须显式在该管道上执行 IO:
boost::asio::streambuf sb;
async_read_until( //
pipe, sb, "message 5\n", //
[&](error_code ec, size_t) { //
std::cout << "Got message 5 (" << ec.message() << ")" << std::endl;
});
这个有效:
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::system::error_code;
namespace /*file-static*/ {
using namespace std::chrono_literals;
static auto now = std::chrono::steady_clock::now;
static const auto t0 = now();
static auto timestamp() {
return std::to_string((now() - t0) / 1.s) + "s ";
}
} // namespace
int main() {
boost::asio::io_context io;
bp::async_pipe pipe(io);
auto on_exit = [](int code, std::error_code ec) {
std::cout << timestamp() << "on_exit: " << ec.message() << " code "
<< code << std::endl;
};
bp::child c( //
"/bin/bash",
std::vector<std::string>{
"-c",
"for a in {1..20}; do sleep 1; echo message $a; done",
}, //
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
boost::asio::streambuf sb;
async_read_until( //
pipe, sb, "message 5\n", //
[&](error_code ec, size_t) { //
std::cout << timestamp() << "Got message 5 (" << ec.message() << ")"
<< std::endl;
});
io.run();
}
版画
5.025400s Got message 5 (Success)
20.100547s on_exit: Success code 0
这样您就可以在需要时回复您正在寻找的内容。请记住 OS 并且 shell 在管道上进行流缓冲,但默认值为 line-buffering 因此,您可以期望在打印换行符后立即接收输入。
大缓冲区?
以上有点假设您可以缓冲整个输出直到有趣的消息。如果那是千兆字节呢?只要您的模式不是千兆字节,您就可以继续阅读,直到符合条件为止。
让我们将我们的示例变成一个异步 grep,它在所有提升 headers 中查找正则表达式 class\s*\w+_heap
。当然,这是很多兆字节的数据,但我们只使用了 10Kb 的缓冲区:
std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");
现在我们创建一个读取循环,当缓冲区已满时读取直到匹配或:
std::function<void()> wait_for_message;
wait_for_message = [&] {
async_read_until( //
pipe, buf, re, //
[&](error_code ec, size_t received) { //
std::cerr << '\x0d' << timestamp() << "Checking for message ("
<< ec.message() << ", total " << total_received
<< ") ";
if (received || ec != boost::asio::error::not_found) {
total_received += received;
buf.consume(received);
boost::smatch m;
if (regex_search(text, m, re)) {
std::cout << "\n" << timestamp()
<< "Found: " << std::quoted(m.str()) << " at "
<< (total_received - m.size()) << " bytes"
<< std::endl;
}
} else {
// discard 90% of buffer capacity
auto discard =
std::min(buf.max_size() / 10 * 9, buf.size());
total_received += discard;
buf.consume(discard);
}
if (!ec | (ec == boost::asio::error::not_found))
wait_for_message();
else
std::cout << "\n" << timestamp() << ec.message() << std::endl;
});
};
当然,这个系统可能会错过匹配 if 匹配超过缓冲区大小的 10%(因为我们只保留之前缓冲区内容的 10% 以允许匹配重叠读取边界)。
再看一遍Live On Coliru
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <boost/regex.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
using boost::system::error_code;
namespace /*file-static*/ {
using namespace std::chrono_literals;
static auto now = std::chrono::steady_clock::now;
static const auto t0 = now();
static auto timestamp() {
return std::to_string((now() - t0) / 1.s) + "s ";
}
} // namespace
int main() {
boost::asio::io_context io;
bp::async_pipe pipe(io);
auto on_exit = [](int code, std::error_code ec) {
std::cout << timestamp() << "on_exit: " << ec.message() << " code "
<< code << std::endl;
};
bp::child c( //
"/usr/bin/find",
std::vector<std::string>{"/usr/local/include/boost", "-name",
"*.hpp", "-exec", "cat", "{}", "+"},
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");
std::function<void()> wait_for_message;
wait_for_message = [&] {
async_read_until( //
pipe, buf, re, //
[&](error_code ec, size_t received) { //
std::cerr << '\x0d' << timestamp() << "Checking for message ("
<< ec.message() << ", total " << total_received
<< ") ";
if (received || ec != boost::asio::error::not_found) {
total_received += received;
buf.consume(received);
boost::smatch m;
if (regex_search(text, m, re)) {
std::cout << "\n" << timestamp()
<< "Found: " << std::quoted(m.str()) << " at "
<< (total_received - m.size()) << " bytes"
<< std::endl;
}
} else {
// discard 90% of buffer capacity
auto discard =
std::min(buf.max_size() / 10 * 9, buf.size());
total_received += discard;
buf.consume(discard);
}
if (!ec | (ec == boost::asio::error::not_found))
wait_for_message();
else
std::cout << "\n" << timestamp() << ec.message() << std::endl;
});
};
wait_for_message();
io.run();
std::cout << timestamp() << " - Done, total_received: " << total_received << "\n";
}
打印
2.033324s Found: "class d_ary_heap" at 6747512 bytes
2.065290s Found: "class pairing_heap" at 6831390 bytes
2.071888s Found: "class binomial_heap" at 6860833 bytes
2.072715s Found: "class skew_heap" at 6895677 bytes
2.073348s Found: "class fibonacci_heap" at 6921559 bytes
34.729355s End of file
34.730515s on_exit: Success code 0
34.730593s - Done, total_received: 154746011
或者在我的机器上直播: