Chrome 使用 Boost Beast 的 DevTools 协议
Chrome DevTools Protocol using Boost Beast
我正在尝试使用 Web 套接字客户端连接到使用 Boost 库的服务器。情况是服务器有时会发送预定数量的 JSON 消息,但有时会更多。
从堆栈溢出中我得到了 @sehe, which can be found here 发布的解决方案。如果我确定发回的消息数量是 1、2、3 等,这对我来说效果很好。
但是如果出现以下情况则效果不佳:
- 您指定接收的消息数量较少;你不会得到
消息“现在”,它将在下次阅读时附加
- 您指定的消息多于预期;它会卡住
等待消息
我对 Boost 网站上的 async example client 进行了一些挖掘和测试。对于 1 条消息,它工作得“很好”。在线程或计时器中使用该示例将触发来自 Boost 的断言。
对我来说,理想的解决方案是@sehe 发布的内容,简短、简单;但它应该读取“所有”发回的消息。我意识到只有当您“知道”消息流何时“结束”时才能做到这一点,但由于我缺乏在 C++ 中使用 Boost 和 Web 套接字的经验,我迷路了。
请告知此目的的解决方案是什么。重申一下:
- 发送命令
- 等待回复;读取所有响应(即使有 10 个 JSON 个对象)
非常感谢
响应 the comments/chat I have cooked up¹ an example of a straight-forward translation of the example from e.g. https://github.com/aslushnikov/getting-started-with-cdp#targets--sessions 使用 Beast 进入 C++。
请注意,它使用命令 ID 将响应与请求相关联。另请注意,这些是特定于会话的,因此如果您必须支持多个会话,则需要考虑到这一点。
#1:回调样式
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/signals2.hpp>
#include <iostream>
#include <deque>
#include <ranges>
#include <boost/json.hpp>
//#include <boost/json/src.hpp> // for header-only
namespace json = boost::json;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace r = std::ranges;
static std::ostream debug(/*nullptr*/ std::cerr.rdbuf());
using namespace std::chrono_literals;
using boost::signals2::scoped_connection;
using boost::system::error_code;
using net::ip::tcp;
// Sends a WebSocket message and prints the response
class CWebSocket_Sync {
websocket::stream<tcp::socket> ws_;
public:
using executor_type = net::any_io_executor;
executor_type get_executor() { return ws_.get_executor(); }
// Resolver and socket require an io_context
explicit CWebSocket_Sync(executor_type ex) : ws_(make_strand(ex)) {}
// call backs are on the strand, not on the main thread
boost::signals2::signal<void(json::object const&)> onMessage;
// public functions not assumed to be on the strand
void Connect(std::string const& host, std::string const& port, std::string const& path)
{
post(get_executor(), [=, this] {
tcp::resolver resolver_(get_executor());
// TODO async_connect prevents potential blocking wait
// TODO async_handshake (idem)
auto ep = net::connect(ws_.next_layer(), //
resolver_.resolve(host, port));
ws_.handshake(host + ':' + std::to_string(ep.port()), path);
do_receive_loop();
});
}
void ServerCommand(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
void CloseConnection() {
post(get_executor(), [this] {
ws_.next_layer().cancel();
ws_.async_close(websocket::close_code::normal, [](error_code ec) {
debug << "CloseConnection (" << ec.message() << ")" << std::endl;
});
});
}
private:
// do_XXXX functions assumed to be on the strand
beast::flat_buffer inbox_;
void do_receive_loop() {
debug << "do_receive_loop..." << std::endl;
ws_.async_read(inbox_, [this](error_code ec, size_t n) {
debug << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
auto text = inbox_.cdata();
auto parsed = json::parse(
{buffer_cast<char const*>(text), text.size()}, ec);
inbox_.clear();
if (!ec) {
assert(parsed.is_object());
onMessage(parsed.as_object()); // exceptions will blow up
do_receive_loop();
} else {
debug << "Ignore failed parse (" << ec.message() << ")" << std::endl;
}
}
});
}
std::deque<std::string> outbox_;
void do_send_loop() {
debug << "do_send_loop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
ws_.async_write( //
net::buffer(outbox_.front()), [this](error_code ec, size_t n) {
debug << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_send_loop();
}
});
}
};
int main()
{
net::thread_pool ioc(1);
CWebSocket_Sync client(ioc.get_executor());
client.Connect("localhost", "9222", "/devtools/browser/bb8efece-b445-42d0-a4cc-349fccd8514d");
auto trace = client.onMessage.connect([&](json::object const& obj) {
debug << "Received " << obj << std::endl;
});
unsigned id = 1; // TODO make per session
scoped_connection sub = client.onMessage.connect([&](json::object const& obj) {
if ((obj.contains("id") && obj.at("id") == 1)) {
auto& infos = obj.at("result").at("targetInfos").as_array();
if (auto pageTarget = r::find_if(infos,
[](auto& info) { return info.at("type") == "page"; })) //
{
std::cout << "pageTarget " << *pageTarget << std::endl;
sub = client.onMessage.connect([&](json::object const& obj) {
// idea:
// if(obj.contains("method") && obj.at("method") == "Target.attachedToTarget"))
if (obj.contains("id") && obj.at("id") == 2) {
auto sessionId = value_to<std::string>(obj.at("result").at("sessionId"));
std::cout << "sessionId: " << sessionId << std::endl;
sub.release(); // stop expecting a specific response
client.ServerCommand({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "
}},
});
}
});
client.ServerCommand(
{{"id", id++},
{"method", "Target.attachToTarget"},
{
"params",
json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}});
}
}
});
client.ServerCommand({
{"id", id++},
{"method", "Target.getTargets"},
});
std::this_thread::sleep_for(5s);
client.CloseConnection();
ioc.join();
}
测试时(我暂时硬编码了 websocket URL);
完整的输出是:
do_receive_loop...
do_send_loop 1
Sent 37 bytes (Success)
do_send_loop 0
Received 10138 bytes (Success)
Received {"id":1,"result":{"targetInfos":[{"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},{"targetId":"D945FE9AC3EBF060805A90097DF2D7EF","type":"page","title":"(1) WhatsApp","url":"https://web.whatsapp.com/","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"6DBC2EDCADF891A4A68FA9A878AAA574","type":"page","title":"aslushnikov/getting-started-with-cdp: Getting Started With Chrome DevTools Protocol","url":"https://github.com/aslushnikov/getting-started-with-cdp#targets--sessions","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"35BE8DA1EE5A0F51EDEF9AA71738968C","type":"background_page","title":"Gnome-shell-integratie","url":"chrome-extension://gphhapmejobijbbhgpjhcjognlahblep/extension.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"477A0D3805F436D95C9D6DC0760862C1","type":"background_page","title":"uBlock Origin","url":"chrome-extension://cjpalhdlnbpafiamejdnhcphjbkeiagm/background.html","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},{"targetId":"B1371BC4FA5117900C2ABF28C69E3098","type":"page","title":"On Software and Languages: Holy cow, I wrote a book!","url":"http://ib-krajewski.blogspot.com/2019/02/holy-cow-i-wrote-book.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"1F3A58D579C18DDD819EF46EBBB0AD4C","type":"page","title":"c++ - Boost Beast Websocket - Send and Read until no more data - Stack Overflow","url":" Reader","url":"chrome-extension://eimadpbcbfnmbkopoojfekhnkhdbieeh/background/index.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"9612E681CCF4E4E47D400B0849FA05E6","type":"background_page","title":"uBlock Origin","url":"chrome-extension://cjpalhdlnbpafiamejdnhcphjbkeiagm/background.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"}]}}
pageTarget {"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"}
do_receive_loop...
do_send_loop 1
Sent 113 bytes (Success)
do_send_loop 0
Received 339 bytes (Success)
Received {"method":"Target.attachedToTarget","params":{"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000","targetInfo":{"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":true,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},"waitingForDebugger":false}}
do_receive_loop...
Received 66 bytes (Success)
Received {"id":2,"result":{"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000"}}
sessionId: 29AD9FFD2EAE70BAF10076A9E05DD000
do_receive_loop...
do_send_loop 1
Sent 142 bytes (Success)
do_send_loop 0
Received 157 bytes (Success)
Received {"id":1,"result":{"frameId":"53AC5A92902F306C626CF3B3A2BB1878","loaderId":"A3680FBE84DEBDA3444FFA6CD7C5A5A5"},"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000"}
do_receive_loop...
Received 0 bytes (Operation canceled)
CloseConnection (Operation canceled)
#2:Promises/Future 风格
我创建了一个 Request
方法,returns 未来像 nodejs 示例:
std::future<json::object> Request(json::object const& cmd)
{
auto fut = Expect([id = msgId(cmd)](json::object const& resp) {
return msgId(resp) == id;
});
Send(cmd);
return fut;
}
请注意它是如何通过添加 msgId
提取助手变得更加优雅的:
static json::object msgId(json::object const& message) {
return filtered(message, {"id", "sessionId"}); // TODO more ?
};
这巧妙地促进了多会话响应,其中 "id"
不需要在不同的 "sessionId"
中是唯一的。条件保持简单 if (msgId(msg) == id)
.
它还使用 Send
和 Expect
作为积木:
void Send(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
template <typename F> std::future<json::object> Expect(F&& pred)
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
state->_subscription = onMessage.connect( //
[=, pred = std::forward<F>(pred)](json::object const& msg) {
if (pred(msg)) {
state->_promise.set_value(msg);
state->_subscription.disconnect();
}
});
return state->_promise.get_future();
}
现在主程序可以少写倒退了:
auto targets = client.Request({
{"id", id++},
{"method", "Target.getTargets"},
}).get().at("result").at("targetInfos");
auto pageTarget = r::find_if(targets.as_array(), [](auto& info) {
return info.at("type") == "page";
});
if (!pageTarget) {
std::cerr << "No page target\n";
return 0;
}
std::cout << "pageTarget " << *pageTarget << std::endl;
auto sessionId = client.Request(
{{"id", id++},
{"method", "Target.attachToTarget"},
{"params", json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}})
.get().at("result").at("sessionId");
std::cout << "sessionId: " << sessionId << std::endl;
auto response = client.Request({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "
}},
}) .get();
std::cout << "Navigation response: " << response << std::endl;
这导致输出如下:
-- trace {"id":1,"result":{"targetInfos":[{"targetId":"35BE8DA1EE5A0F51EDEF9AA71738968C","type":"background_page","title":"Gnom....
pageTarget {"targetId":"1F3A58D579C18DDD819EF46EBBB0AD4C","type":"page","title":"c++ - Boost Beast Websocket - Send and Read unt....
-- trace {"method":"Target.attachedToTarget","params":{"sessionId":"58931793102C2A5576E4D5D6CDC3D601","targetInfo":{"targetId":....
-- trace {"id":2,"result":{"sessionId":"58931793102C2A5576E4D5D6CDC3D601"}}
sessionId: "58931793102C2A5576E4D5D6CDC3D601"
-- trace {"id":1,"result":{"frameId":"1F3A58D579C18DDD819EF46EBBB0AD4C","loaderId":"9E70C5AAF0B5A503BA2770BB73A4FEC3"},"session....
Navigation response: {"id":1,"result":{"frameId":"1F3A58D579C18DDD819EF46EBBB0AD4C","loaderId":"9E70C5AAF0B5A503BA2770BB73A4FEC3....
关注后评论:
I would have one last question if you do not mind? Can I use somehow std::future<T>::wait_until
so I can find out if the page was loaded completely? (for example checking for Network.loadingFinished
object)?
当然,只需编码即可:
{
std::promise<void> promise;
scoped_connection sub =
client.onMessage.connect([&](json::object const& msg) {
if (auto m = msg.if_contains("method"); *m == "Network.loadingFinished")
promise.set_value();
});
auto loadingFinished = promise.get_future();
loadingFinished.wait(); // OR:
loadingFinished.wait_for(5s); // OR:
loadingFinished.wait_until(std::chrono::steady_clock::now() + 1min);
}
还有留言:
{
std::promise<json::object> promise;
scoped_connection sub =
client.onMessage.connect([&](json::object const& msg) {
if (auto m = msg.if_contains("method"); *m == "Network.loadingFinished")
promise.set_value(msg);
});
auto message = promise.get_future().get();;
}
当然你could/should考虑再次封装在class方法中。
UPDATE - I have since refactored the original futures code to use these as building blocks (Expect
, Send
together make Request
)
现在你可以
auto loadingFinished = client.Expect(isMethod("Network.loadingFinished")).get();
std::cout << "Got: " << loadingFinished << "\n";
当然,假设有一个像这样的小帮手:
auto isMethod = [](auto value) {
return [value](json::object const& msg) {
auto m = msg.if_contains("method");
return m && *m == value;
};
};
作为奖励,持续监控特定消息:
enum ActionResult { ContinueMonitoring, StopMonitoring };
template <typename A, typename F>
auto Monitor(A action, F&& filter = [](auto&&) noexcept { return true; })
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
auto stop = [state] { state->_subscription.disconnect(); };
state->_subscription = onMessage.connect( //
[=, filter = std::forward<F>(filter)](json::object const& msg) {
if (filter(msg) && StopMonitoring == action(msg))
stop();
});
return stop; // gives the caller an "external" way to stop the monitor
}
一个人为的用法示例:
// monitor until 3 messages having an id divisable by 7 have been received
std::atomic_int count = 0;
auto stopMonitor = client.Monitor(
[&count](json::object const& msg) {
std::cout << "Divisable by 7: " << msg << "\n";
return ++count >= 3 ? CDPClient::StopMonitoring
: CDPClient::ContinueMonitoring;
},
[](json::object const& msg) {
auto id = msg.if_contains("id");
return id && (0 == id->as_int64() % 7);
});
std::this_thread::sleep_for(5s);
stopMonitor(); // even if 3 messages had not been reached, stop the monitor
std::cout << count << " messages having an id divisable by 7 had been received in 5s\n";
完整上市(期货版)
伤心Exceeds Compiler Explorer Limits:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/json.hpp>
//#include <boost/json/src.hpp> // for header-only
#include <boost/signals2.hpp>
#include <deque>
#include <iostream>
#include <ranges>
namespace json = boost::json;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace r = std::ranges;
static std::ostream debug(nullptr); // std::cerr.rdbuf()
static const auto filtered(json::object const& obj,
std::initializer_list<json::string_view> props)
{
boost::json::object result;
for (auto prop : props)
if (auto const* v = obj.if_contains(prop))
result[prop] = *v;
return result;
}
using namespace std::chrono_literals;
using boost::signals2::scoped_connection;
using boost::system::error_code;
using net::ip::tcp;
// Sends a WebSocket message and prints the response
class CDPClient {
websocket::stream<tcp::socket> ws_;
public:
using executor_type = net::any_io_executor;
executor_type get_executor() { return ws_.get_executor(); }
// Resolver and socket require an io_context
explicit CDPClient(executor_type ex) : ws_(make_strand(ex)) {}
// call backs are on the strand, not on the main thread
boost::signals2::signal<void(json::object const&)> onMessage;
// public functions not assumed to be on the strand
void Connect(std::string const& host, std::string const& port, std::string const& path)
{
post(get_executor(), [=, this] {
tcp::resolver resolver_(get_executor());
// TODO async_connect prevents potential blocking wait
// TODO async_handshake (idem)
auto ep = net::connect(ws_.next_layer(), //
resolver_.resolve(host, port));
ws_.handshake(host + ':' + std::to_string(ep.port()), path);
do_receive_loop();
});
}
void Send(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
template <typename F> std::future<json::object> Expect(F&& pred)
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
state->_subscription = onMessage.connect( //
[=, pred = std::forward<F>(pred)](json::object const& msg) {
if (pred(msg)) {
state->_promise.set_value(msg);
state->_subscription.disconnect();
}
});
return state->_promise.get_future();
}
static json::object msgId(json::object const& message) {
return filtered(message, {"id", "sessionId"}); // TODO more ?
};
std::future<json::object> Request(json::object const& cmd)
{
auto fut = Expect([id = msgId(cmd)](json::object const& resp) {
return msgId(resp) == id;
});
Send(cmd);
return fut;
}
enum ActionResult { ContinueMonitoring, StopMonitoring };
template <typename A, typename F>
auto Monitor(A action, F&& filter = [](auto&&) noexcept { return true; })
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
auto stop = [state] { state->_subscription.disconnect(); };
state->_subscription = onMessage.connect( //
[=, filter = std::forward<F>(filter)](json::object const& msg) {
if (filter(msg) && StopMonitoring == action(msg))
stop();
});
return stop; // gives the caller an "external" way to stop the monitor
}
void CloseConnection() {
post(get_executor(), [this] {
ws_.next_layer().cancel();
ws_.async_close( //
websocket::close_code::normal, [this](error_code ec) {
debug << "CloseConnection (" << ec.message() << ")" << std::endl;
onMessage.disconnect_all_slots();
});
});
}
private:
// do_XXXX functions assumed to be on the strand
beast::flat_buffer inbox_;
void do_receive_loop() {
debug << "do_receive_loop..." << std::endl;
ws_.async_read(inbox_, [this](error_code ec, size_t n) {
debug << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
auto text = inbox_.cdata();
auto parsed = json::parse(
{buffer_cast<char const*>(text), text.size()}, ec);
inbox_.clear();
if (!ec) {
assert(parsed.is_object());
onMessage(parsed.as_object()); // exceptions will blow up
do_receive_loop();
} else {
debug << "Ignore failed parse (" << ec.message() << ")" << std::endl;
}
}
});
}
std::deque<std::string> outbox_;
void do_send_loop() {
debug << "do_send_loop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
ws_.async_write( //
net::buffer(outbox_.front()), [this](error_code ec, size_t n) {
debug << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_send_loop();
}
});
}
};
int main()
{
net::thread_pool ioc(1);
CDPClient client(ioc.get_executor());
client.Connect("localhost", "9222", "/devtools/browser/bb8efece-b445-42d0-a4cc-349fccd8514d");
auto trace = client.onMessage.connect([&](json::object const& obj) {
std::cerr << " -- trace " << obj << std::endl;
});
unsigned id = 1; // TODO make per session
auto targets = client.Request({
{"id", id++},
{"method", "Target.getTargets"},
}).get().at("result").at("targetInfos");
auto pageTarget = r::find_if(targets.as_array(), [](auto& info) {
return info.at("type") == "page";
});
if (!pageTarget) {
std::cerr << "No page target\n";
return 0;
}
std::cout << "pageTarget " << *pageTarget << std::endl;
auto sessionId = client.Request(
{{"id", id++},
{"method", "Target.attachToTarget"},
{"params", json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}})
.get().at("result").at("sessionId");
std::cout << "sessionId: " << sessionId << std::endl;
auto response = client.Request({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "
}},
}) .get();
std::cout << "Navigation response: " << response << std::endl;
auto isMethod = [](auto value) {
return [value](json::object const& msg) {
auto m = msg.if_contains("method");
return m && *m == value;
};
};
auto loadingFinished = client.Expect(isMethod("Network.loadingFinished")).get();
std::cout << "Got: " << loadingFinished << "\n";
// monitor until 3 messages having an id divisable by 7 have been received
std::atomic_int count = 0;
auto stopMonitor = client.Monitor(
[&count](json::object const& msg) {
std::cout << "Divisable by 7: " << msg << "\n";
return ++count >= 3 ? CDPClient::StopMonitoring
: CDPClient::ContinueMonitoring;
},
[](json::object const& msg) {
auto id = msg.if_contains("id");
return id && (0 == id->as_int64() % 7);
});
std::this_thread::sleep_for(5s);
stopMonitor(); // even if 3 messages had not been reached, stop the monitor
std::cout << count << " messages having an id divisable by 7 had been received in 5s\n";
client.CloseConnection();
ioc.join();
}
¹ 除了一些晚餐
我正在尝试使用 Web 套接字客户端连接到使用 Boost 库的服务器。情况是服务器有时会发送预定数量的 JSON 消息,但有时会更多。
从堆栈溢出中我得到了 @sehe, which can be found here 发布的解决方案。如果我确定发回的消息数量是 1、2、3 等,这对我来说效果很好。
但是如果出现以下情况则效果不佳:
- 您指定接收的消息数量较少;你不会得到 消息“现在”,它将在下次阅读时附加
- 您指定的消息多于预期;它会卡住 等待消息
我对 Boost 网站上的 async example client 进行了一些挖掘和测试。对于 1 条消息,它工作得“很好”。在线程或计时器中使用该示例将触发来自 Boost 的断言。
对我来说,理想的解决方案是@sehe 发布的内容,简短、简单;但它应该读取“所有”发回的消息。我意识到只有当您“知道”消息流何时“结束”时才能做到这一点,但由于我缺乏在 C++ 中使用 Boost 和 Web 套接字的经验,我迷路了。
请告知此目的的解决方案是什么。重申一下:
- 发送命令
- 等待回复;读取所有响应(即使有 10 个 JSON 个对象)
非常感谢
响应 the comments/chat I have cooked up¹ an example of a straight-forward translation of the example from e.g. https://github.com/aslushnikov/getting-started-with-cdp#targets--sessions 使用 Beast 进入 C++。
请注意,它使用命令 ID 将响应与请求相关联。另请注意,这些是特定于会话的,因此如果您必须支持多个会话,则需要考虑到这一点。
#1:回调样式
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/signals2.hpp>
#include <iostream>
#include <deque>
#include <ranges>
#include <boost/json.hpp>
//#include <boost/json/src.hpp> // for header-only
namespace json = boost::json;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace r = std::ranges;
static std::ostream debug(/*nullptr*/ std::cerr.rdbuf());
using namespace std::chrono_literals;
using boost::signals2::scoped_connection;
using boost::system::error_code;
using net::ip::tcp;
// Sends a WebSocket message and prints the response
class CWebSocket_Sync {
websocket::stream<tcp::socket> ws_;
public:
using executor_type = net::any_io_executor;
executor_type get_executor() { return ws_.get_executor(); }
// Resolver and socket require an io_context
explicit CWebSocket_Sync(executor_type ex) : ws_(make_strand(ex)) {}
// call backs are on the strand, not on the main thread
boost::signals2::signal<void(json::object const&)> onMessage;
// public functions not assumed to be on the strand
void Connect(std::string const& host, std::string const& port, std::string const& path)
{
post(get_executor(), [=, this] {
tcp::resolver resolver_(get_executor());
// TODO async_connect prevents potential blocking wait
// TODO async_handshake (idem)
auto ep = net::connect(ws_.next_layer(), //
resolver_.resolve(host, port));
ws_.handshake(host + ':' + std::to_string(ep.port()), path);
do_receive_loop();
});
}
void ServerCommand(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
void CloseConnection() {
post(get_executor(), [this] {
ws_.next_layer().cancel();
ws_.async_close(websocket::close_code::normal, [](error_code ec) {
debug << "CloseConnection (" << ec.message() << ")" << std::endl;
});
});
}
private:
// do_XXXX functions assumed to be on the strand
beast::flat_buffer inbox_;
void do_receive_loop() {
debug << "do_receive_loop..." << std::endl;
ws_.async_read(inbox_, [this](error_code ec, size_t n) {
debug << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
auto text = inbox_.cdata();
auto parsed = json::parse(
{buffer_cast<char const*>(text), text.size()}, ec);
inbox_.clear();
if (!ec) {
assert(parsed.is_object());
onMessage(parsed.as_object()); // exceptions will blow up
do_receive_loop();
} else {
debug << "Ignore failed parse (" << ec.message() << ")" << std::endl;
}
}
});
}
std::deque<std::string> outbox_;
void do_send_loop() {
debug << "do_send_loop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
ws_.async_write( //
net::buffer(outbox_.front()), [this](error_code ec, size_t n) {
debug << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_send_loop();
}
});
}
};
int main()
{
net::thread_pool ioc(1);
CWebSocket_Sync client(ioc.get_executor());
client.Connect("localhost", "9222", "/devtools/browser/bb8efece-b445-42d0-a4cc-349fccd8514d");
auto trace = client.onMessage.connect([&](json::object const& obj) {
debug << "Received " << obj << std::endl;
});
unsigned id = 1; // TODO make per session
scoped_connection sub = client.onMessage.connect([&](json::object const& obj) {
if ((obj.contains("id") && obj.at("id") == 1)) {
auto& infos = obj.at("result").at("targetInfos").as_array();
if (auto pageTarget = r::find_if(infos,
[](auto& info) { return info.at("type") == "page"; })) //
{
std::cout << "pageTarget " << *pageTarget << std::endl;
sub = client.onMessage.connect([&](json::object const& obj) {
// idea:
// if(obj.contains("method") && obj.at("method") == "Target.attachedToTarget"))
if (obj.contains("id") && obj.at("id") == 2) {
auto sessionId = value_to<std::string>(obj.at("result").at("sessionId"));
std::cout << "sessionId: " << sessionId << std::endl;
sub.release(); // stop expecting a specific response
client.ServerCommand({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "
}},
});
}
});
client.ServerCommand(
{{"id", id++},
{"method", "Target.attachToTarget"},
{
"params",
json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}});
}
}
});
client.ServerCommand({
{"id", id++},
{"method", "Target.getTargets"},
});
std::this_thread::sleep_for(5s);
client.CloseConnection();
ioc.join();
}
测试时(我暂时硬编码了 websocket URL);
完整的输出是:
do_receive_loop...
do_send_loop 1
Sent 37 bytes (Success)
do_send_loop 0
Received 10138 bytes (Success)
Received {"id":1,"result":{"targetInfos":[{"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},{"targetId":"D945FE9AC3EBF060805A90097DF2D7EF","type":"page","title":"(1) WhatsApp","url":"https://web.whatsapp.com/","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"6DBC2EDCADF891A4A68FA9A878AAA574","type":"page","title":"aslushnikov/getting-started-with-cdp: Getting Started With Chrome DevTools Protocol","url":"https://github.com/aslushnikov/getting-started-with-cdp#targets--sessions","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"35BE8DA1EE5A0F51EDEF9AA71738968C","type":"background_page","title":"Gnome-shell-integratie","url":"chrome-extension://gphhapmejobijbbhgpjhcjognlahblep/extension.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"477A0D3805F436D95C9D6DC0760862C1","type":"background_page","title":"uBlock Origin","url":"chrome-extension://cjpalhdlnbpafiamejdnhcphjbkeiagm/background.html","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},{"targetId":"B1371BC4FA5117900C2ABF28C69E3098","type":"page","title":"On Software and Languages: Holy cow, I wrote a book!","url":"http://ib-krajewski.blogspot.com/2019/02/holy-cow-i-wrote-book.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"1F3A58D579C18DDD819EF46EBBB0AD4C","type":"page","title":"c++ - Boost Beast Websocket - Send and Read until no more data - Stack Overflow","url":" Reader","url":"chrome-extension://eimadpbcbfnmbkopoojfekhnkhdbieeh/background/index.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"9612E681CCF4E4E47D400B0849FA05E6","type":"background_page","title":"uBlock Origin","url":"chrome-extension://cjpalhdlnbpafiamejdnhcphjbkeiagm/background.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"}]}}
pageTarget {"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"}
do_receive_loop...
do_send_loop 1
Sent 113 bytes (Success)
do_send_loop 0
Received 339 bytes (Success)
Received {"method":"Target.attachedToTarget","params":{"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000","targetInfo":{"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":true,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},"waitingForDebugger":false}}
do_receive_loop...
Received 66 bytes (Success)
Received {"id":2,"result":{"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000"}}
sessionId: 29AD9FFD2EAE70BAF10076A9E05DD000
do_receive_loop...
do_send_loop 1
Sent 142 bytes (Success)
do_send_loop 0
Received 157 bytes (Success)
Received {"id":1,"result":{"frameId":"53AC5A92902F306C626CF3B3A2BB1878","loaderId":"A3680FBE84DEBDA3444FFA6CD7C5A5A5"},"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000"}
do_receive_loop...
Received 0 bytes (Operation canceled)
CloseConnection (Operation canceled)
#2:Promises/Future 风格
我创建了一个 Request
方法,returns 未来像 nodejs 示例:
std::future<json::object> Request(json::object const& cmd)
{
auto fut = Expect([id = msgId(cmd)](json::object const& resp) {
return msgId(resp) == id;
});
Send(cmd);
return fut;
}
请注意它是如何通过添加 msgId
提取助手变得更加优雅的:
static json::object msgId(json::object const& message) {
return filtered(message, {"id", "sessionId"}); // TODO more ?
};
这巧妙地促进了多会话响应,其中 "id"
不需要在不同的 "sessionId"
中是唯一的。条件保持简单 if (msgId(msg) == id)
.
它还使用 Send
和 Expect
作为积木:
void Send(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
template <typename F> std::future<json::object> Expect(F&& pred)
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
state->_subscription = onMessage.connect( //
[=, pred = std::forward<F>(pred)](json::object const& msg) {
if (pred(msg)) {
state->_promise.set_value(msg);
state->_subscription.disconnect();
}
});
return state->_promise.get_future();
}
现在主程序可以少写倒退了:
auto targets = client.Request({
{"id", id++},
{"method", "Target.getTargets"},
}).get().at("result").at("targetInfos");
auto pageTarget = r::find_if(targets.as_array(), [](auto& info) {
return info.at("type") == "page";
});
if (!pageTarget) {
std::cerr << "No page target\n";
return 0;
}
std::cout << "pageTarget " << *pageTarget << std::endl;
auto sessionId = client.Request(
{{"id", id++},
{"method", "Target.attachToTarget"},
{"params", json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}})
.get().at("result").at("sessionId");
std::cout << "sessionId: " << sessionId << std::endl;
auto response = client.Request({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "
}},
}) .get();
std::cout << "Navigation response: " << response << std::endl;
这导致输出如下:
-- trace {"id":1,"result":{"targetInfos":[{"targetId":"35BE8DA1EE5A0F51EDEF9AA71738968C","type":"background_page","title":"Gnom....
pageTarget {"targetId":"1F3A58D579C18DDD819EF46EBBB0AD4C","type":"page","title":"c++ - Boost Beast Websocket - Send and Read unt....
-- trace {"method":"Target.attachedToTarget","params":{"sessionId":"58931793102C2A5576E4D5D6CDC3D601","targetInfo":{"targetId":....
-- trace {"id":2,"result":{"sessionId":"58931793102C2A5576E4D5D6CDC3D601"}}
sessionId: "58931793102C2A5576E4D5D6CDC3D601"
-- trace {"id":1,"result":{"frameId":"1F3A58D579C18DDD819EF46EBBB0AD4C","loaderId":"9E70C5AAF0B5A503BA2770BB73A4FEC3"},"session....
Navigation response: {"id":1,"result":{"frameId":"1F3A58D579C18DDD819EF46EBBB0AD4C","loaderId":"9E70C5AAF0B5A503BA2770BB73A4FEC3....
关注后评论:
I would have one last question if you do not mind? Can I use somehow
std::future<T>::wait_until
so I can find out if the page was loaded completely? (for example checking forNetwork.loadingFinished
object)?
当然,只需编码即可:
{
std::promise<void> promise;
scoped_connection sub =
client.onMessage.connect([&](json::object const& msg) {
if (auto m = msg.if_contains("method"); *m == "Network.loadingFinished")
promise.set_value();
});
auto loadingFinished = promise.get_future();
loadingFinished.wait(); // OR:
loadingFinished.wait_for(5s); // OR:
loadingFinished.wait_until(std::chrono::steady_clock::now() + 1min);
}
还有留言:
{
std::promise<json::object> promise;
scoped_connection sub =
client.onMessage.connect([&](json::object const& msg) {
if (auto m = msg.if_contains("method"); *m == "Network.loadingFinished")
promise.set_value(msg);
});
auto message = promise.get_future().get();;
}
当然你could/should考虑再次封装在class方法中。
UPDATE - I have since refactored the original futures code to use these as building blocks (
Expect
,Send
together makeRequest
)
现在你可以
auto loadingFinished = client.Expect(isMethod("Network.loadingFinished")).get();
std::cout << "Got: " << loadingFinished << "\n";
当然,假设有一个像这样的小帮手:
auto isMethod = [](auto value) {
return [value](json::object const& msg) {
auto m = msg.if_contains("method");
return m && *m == value;
};
};
作为奖励,持续监控特定消息:
enum ActionResult { ContinueMonitoring, StopMonitoring };
template <typename A, typename F>
auto Monitor(A action, F&& filter = [](auto&&) noexcept { return true; })
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
auto stop = [state] { state->_subscription.disconnect(); };
state->_subscription = onMessage.connect( //
[=, filter = std::forward<F>(filter)](json::object const& msg) {
if (filter(msg) && StopMonitoring == action(msg))
stop();
});
return stop; // gives the caller an "external" way to stop the monitor
}
一个人为的用法示例:
// monitor until 3 messages having an id divisable by 7 have been received
std::atomic_int count = 0;
auto stopMonitor = client.Monitor(
[&count](json::object const& msg) {
std::cout << "Divisable by 7: " << msg << "\n";
return ++count >= 3 ? CDPClient::StopMonitoring
: CDPClient::ContinueMonitoring;
},
[](json::object const& msg) {
auto id = msg.if_contains("id");
return id && (0 == id->as_int64() % 7);
});
std::this_thread::sleep_for(5s);
stopMonitor(); // even if 3 messages had not been reached, stop the monitor
std::cout << count << " messages having an id divisable by 7 had been received in 5s\n";
完整上市(期货版)
伤心Exceeds Compiler Explorer Limits:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/json.hpp>
//#include <boost/json/src.hpp> // for header-only
#include <boost/signals2.hpp>
#include <deque>
#include <iostream>
#include <ranges>
namespace json = boost::json;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace r = std::ranges;
static std::ostream debug(nullptr); // std::cerr.rdbuf()
static const auto filtered(json::object const& obj,
std::initializer_list<json::string_view> props)
{
boost::json::object result;
for (auto prop : props)
if (auto const* v = obj.if_contains(prop))
result[prop] = *v;
return result;
}
using namespace std::chrono_literals;
using boost::signals2::scoped_connection;
using boost::system::error_code;
using net::ip::tcp;
// Sends a WebSocket message and prints the response
class CDPClient {
websocket::stream<tcp::socket> ws_;
public:
using executor_type = net::any_io_executor;
executor_type get_executor() { return ws_.get_executor(); }
// Resolver and socket require an io_context
explicit CDPClient(executor_type ex) : ws_(make_strand(ex)) {}
// call backs are on the strand, not on the main thread
boost::signals2::signal<void(json::object const&)> onMessage;
// public functions not assumed to be on the strand
void Connect(std::string const& host, std::string const& port, std::string const& path)
{
post(get_executor(), [=, this] {
tcp::resolver resolver_(get_executor());
// TODO async_connect prevents potential blocking wait
// TODO async_handshake (idem)
auto ep = net::connect(ws_.next_layer(), //
resolver_.resolve(host, port));
ws_.handshake(host + ':' + std::to_string(ep.port()), path);
do_receive_loop();
});
}
void Send(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
template <typename F> std::future<json::object> Expect(F&& pred)
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
state->_subscription = onMessage.connect( //
[=, pred = std::forward<F>(pred)](json::object const& msg) {
if (pred(msg)) {
state->_promise.set_value(msg);
state->_subscription.disconnect();
}
});
return state->_promise.get_future();
}
static json::object msgId(json::object const& message) {
return filtered(message, {"id", "sessionId"}); // TODO more ?
};
std::future<json::object> Request(json::object const& cmd)
{
auto fut = Expect([id = msgId(cmd)](json::object const& resp) {
return msgId(resp) == id;
});
Send(cmd);
return fut;
}
enum ActionResult { ContinueMonitoring, StopMonitoring };
template <typename A, typename F>
auto Monitor(A action, F&& filter = [](auto&&) noexcept { return true; })
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
auto stop = [state] { state->_subscription.disconnect(); };
state->_subscription = onMessage.connect( //
[=, filter = std::forward<F>(filter)](json::object const& msg) {
if (filter(msg) && StopMonitoring == action(msg))
stop();
});
return stop; // gives the caller an "external" way to stop the monitor
}
void CloseConnection() {
post(get_executor(), [this] {
ws_.next_layer().cancel();
ws_.async_close( //
websocket::close_code::normal, [this](error_code ec) {
debug << "CloseConnection (" << ec.message() << ")" << std::endl;
onMessage.disconnect_all_slots();
});
});
}
private:
// do_XXXX functions assumed to be on the strand
beast::flat_buffer inbox_;
void do_receive_loop() {
debug << "do_receive_loop..." << std::endl;
ws_.async_read(inbox_, [this](error_code ec, size_t n) {
debug << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
auto text = inbox_.cdata();
auto parsed = json::parse(
{buffer_cast<char const*>(text), text.size()}, ec);
inbox_.clear();
if (!ec) {
assert(parsed.is_object());
onMessage(parsed.as_object()); // exceptions will blow up
do_receive_loop();
} else {
debug << "Ignore failed parse (" << ec.message() << ")" << std::endl;
}
}
});
}
std::deque<std::string> outbox_;
void do_send_loop() {
debug << "do_send_loop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
ws_.async_write( //
net::buffer(outbox_.front()), [this](error_code ec, size_t n) {
debug << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_send_loop();
}
});
}
};
int main()
{
net::thread_pool ioc(1);
CDPClient client(ioc.get_executor());
client.Connect("localhost", "9222", "/devtools/browser/bb8efece-b445-42d0-a4cc-349fccd8514d");
auto trace = client.onMessage.connect([&](json::object const& obj) {
std::cerr << " -- trace " << obj << std::endl;
});
unsigned id = 1; // TODO make per session
auto targets = client.Request({
{"id", id++},
{"method", "Target.getTargets"},
}).get().at("result").at("targetInfos");
auto pageTarget = r::find_if(targets.as_array(), [](auto& info) {
return info.at("type") == "page";
});
if (!pageTarget) {
std::cerr << "No page target\n";
return 0;
}
std::cout << "pageTarget " << *pageTarget << std::endl;
auto sessionId = client.Request(
{{"id", id++},
{"method", "Target.attachToTarget"},
{"params", json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}})
.get().at("result").at("sessionId");
std::cout << "sessionId: " << sessionId << std::endl;
auto response = client.Request({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "
}},
}) .get();
std::cout << "Navigation response: " << response << std::endl;
auto isMethod = [](auto value) {
return [value](json::object const& msg) {
auto m = msg.if_contains("method");
return m && *m == value;
};
};
auto loadingFinished = client.Expect(isMethod("Network.loadingFinished")).get();
std::cout << "Got: " << loadingFinished << "\n";
// monitor until 3 messages having an id divisable by 7 have been received
std::atomic_int count = 0;
auto stopMonitor = client.Monitor(
[&count](json::object const& msg) {
std::cout << "Divisable by 7: " << msg << "\n";
return ++count >= 3 ? CDPClient::StopMonitoring
: CDPClient::ContinueMonitoring;
},
[](json::object const& msg) {
auto id = msg.if_contains("id");
return id && (0 == id->as_int64() % 7);
});
std::this_thread::sleep_for(5s);
stopMonitor(); // even if 3 messages had not been reached, stop the monitor
std::cout << count << " messages having an id divisable by 7 had been received in 5s\n";
client.CloseConnection();
ioc.join();
}
¹ 除了一些晚餐