使用 boost::asio 从多个线程发送 http 请求。如何串行处理响应
Sending http request from multiple threads using boost::asio. How to handle responses serially
对于客户端应用程序,我希望使用 strand 序列化 http post 请求,以避免重叠写入/响应 to/from 服务器。
序列化是使用 boost::asio 进行的,方法是从 strand.post 调用该方法作为回调,如下所示。请注意,在每个 http 会话中(写入请求并读取其响应,是在单独的回调中进行的。(请注意,在 post 中,http 是同步进行的。
boost::asio::io_context::strand strand_;
void Send(
boost::beast::http::request<boost::beast::http::string_body> &req) {
strand_.post([=]() {
boost::beast::http::write(stream_, req);
...
boost::beast::http::response<boost::beast::http::dynamic_body> res;
boost::beast::flat_buffer buffer;
boost::beast::error_code ec;
boost::beast::http::read(stream_.socket(), buffer, res, ec);
}
从我的示例中可以看出,响应读取也是在 post 回调中进行的。我的问题是,如果我不关心阅读回复的顺序,是否真的需要这样做。我可以假设在每个发送到服务器 boost::beast::http::write(stream_, req);
的 http 请求之后,数据已经在 rx 缓冲区队列中等待,所以另一个线程可以逐个读取响应吗?
谢谢!
您实际上并没有异步阅读,因此您实际上并没有与 strand 进行太多同步。唯一同步的是对 stream_
/socket.
的访问
现在,同步做所有事情是个好主意。在那种情况下,我建议您不需要任何线程,因此,没有链开始。
一旦您确实让 strand/thread(s) 执行重要操作,就有可能阻塞服务线程。考虑网络服务器何时需要一秒钟来响应。这在计算机术语中需要很长时间。
如果您同时执行与线程一样多的请求(通常可能很低,例如 4 个),那么 io 服务上没有其他任何进展,从而否定了 ASIO 的真正目的:异步 I/O.
让我快速解决您的问题代码中的一些小问题,使其独立:Live On Coliru
#include <boost/beast/http.hpp>
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::ip::tcp;
namespace beast = boost::beast;
namespace http = beast::http;
using Context = boost::asio::io_context;
using Strand = boost::asio::strand<Context::executor_type>;
struct Demo {
using Request = http::request<http::string_body>;
Demo(Context& ctx, tcp::endpoint ep) //
: strand_(ctx.get_executor())
{
stream_.connect(ep);
}
void Send(Request const& req)
{
post(strand_, [=,this]() {
// prepare request ...
http::write(stream_, req);
//...
http::response<boost::beast::http::dynamic_body> res;
beast::flat_buffer buffer;
beast::error_code ec;
http::read(stream_, buffer, res, ec);
std::cout << res << "\n";
});
}
private:
Strand strand_;
tcp::socket stream_{strand_};
};
int main() {
Context io;
Demo x(io, {{}, 80});
Demo::Request req{http::verb::get, "/", 10};
req.prepare_payload();
x.Send(req);
io.run();
}
改善
我建议使用安全的异步接口。 IE。您无法确定在前一个请求完成之前不会在同一个套接字上启动新请求,因此您需要一个队列:
void Send(Request req) {
post(strand_, [this, req = std::move(req)]() mutable {
_outgoing.push_back(std::move(req));
if (_outgoing.size() == 1) // no pending
ServiceRequestQueue();
});
}
现在,您拥有的所有逻辑都移到了请求循环中,但是异步:
void ServiceRequestQueue()
{
http::async_write( //
stream_, _outgoing.front(), [this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Request cannot be sent: " << ec.message() << std::endl;
return;
}
// receive response
_incoming.clear();
_incoming.body().clear();
http::async_read( //
stream_, buffer, _incoming,
[this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Response cannot be received: "
<< ec.message() << std::endl;
return;
}
// std::cout << _incoming.base() << "\n";
std::cout << stream_.remote_endpoint() << " "
<< _incoming.result() << " "
<< _incoming.body().size() << "\n";
// request done
_outgoing.pop_front();
// continue if more queued
if (not _outgoing.empty())
ServiceRequestQueue();
});
});
}
您可能希望将一些完成处理程序拆分为单独的函数,或者对请求做一些有用的事情。
int main() {
Context io;
Demo example_com { io, "93.184.216.34", 80 } ;
Demo coliru { io, "173.203.57.63", 80 } ;
Demo localhost { io, "127.0.0.1", 80 } ;
// queue many requests before service start
auto queue10 = [](Demo& client, std::string hostname, int version) {
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host,hostname);
req.prepare_payload();
for (int i = 0; i < 10; ++i)
client.Send(req);
};
queue10(example_com, "www.example.com", 11);
queue10(coliru, "coliru-stacked-crooked.com", 11);
queue10(localhost, "sehe.nl", 10);
// start service
io.run();
}
在我的系统上打印:
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
Note that if you create mamy requests simultaneously (e.g. even before running the io_context
at all), you can observe that separate HTTP clients work in overlapping fashion.
高级
如果您真的想要一个启动请求并允许您在完成处理程序中使用响应的函数,请考虑像这样扩展您的接口:
template <typename Token>
void async_send(Request req, Token&& token) {
using result_type = typename boost::asio::async_result<
std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
struct Op {
Request req;
Response res;
handler_type handler;
Op(Request&& r, handler_type&& h)
: req(std::move(r))
, handler(std::move(h))
{
}
bool check(beast::error_code ec, bool force_completion = false) {
if (ec || force_completion)
std::move(handler)(ec, std::move(res));
return !ec.failed();
}
};
auto op = std::make_shared<Op>(std::move(req), std::move(handler));
post(strand_, [this, op] {
http::async_write( //
stream_, op->req,
[this, op](beast::error_code ec, size_t) mutable {
if (op->check(ec))
http::async_read(stream_, buffer, op->res,
[op](beast::error_code ec, size_t) {
op->check(ec, true);
});
});
});
return result.get();
}
请注意,这将避免每个客户端的重叠请求的责任移回给调用者。所以开始一些请求链,比如
// queue several request chains before service start
AsyncRequestChain(10, example_com, "www.example.com");
AsyncRequestChain(10, coliru, "coliru.stacked-crooked.com");
AsyncRequestChain(10, localhost, "sehe.nl");
// start service
io.run();
链本身是:
void AsyncRequestChain(unsigned n, Demo& client, std::string hostname)
{
if (!n)
return;
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host, hostname);
req.prepare_payload();
client.async_send( //
req, [=, &client](beast::error_code ec, Demo::Response&& res) {
std::cout << hostname << ": " << ec.message();
if (!ec)
std::cout << " " << res.result() //
<< " " << res.body().size();
std::cout << std::endl;
// continue with next iteration
AsyncRequestChain(n - 1, client, hostname);
});
}
在我的机器上打印:
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
对于客户端应用程序,我希望使用 strand 序列化 http post 请求,以避免重叠写入/响应 to/from 服务器。
序列化是使用 boost::asio 进行的,方法是从 strand.post 调用该方法作为回调,如下所示。请注意,在每个 http 会话中(写入请求并读取其响应,是在单独的回调中进行的。(请注意,在 post 中,http 是同步进行的。
boost::asio::io_context::strand strand_;
void Send(
boost::beast::http::request<boost::beast::http::string_body> &req) {
strand_.post([=]() {
boost::beast::http::write(stream_, req);
...
boost::beast::http::response<boost::beast::http::dynamic_body> res;
boost::beast::flat_buffer buffer;
boost::beast::error_code ec;
boost::beast::http::read(stream_.socket(), buffer, res, ec);
}
从我的示例中可以看出,响应读取也是在 post 回调中进行的。我的问题是,如果我不关心阅读回复的顺序,是否真的需要这样做。我可以假设在每个发送到服务器 boost::beast::http::write(stream_, req);
的 http 请求之后,数据已经在 rx 缓冲区队列中等待,所以另一个线程可以逐个读取响应吗?
谢谢!
您实际上并没有异步阅读,因此您实际上并没有与 strand 进行太多同步。唯一同步的是对 stream_
/socket.
现在,同步做所有事情是个好主意。在那种情况下,我建议您不需要任何线程,因此,没有链开始。
一旦您确实让 strand/thread(s) 执行重要操作,就有可能阻塞服务线程。考虑网络服务器何时需要一秒钟来响应。这在计算机术语中需要很长时间。
如果您同时执行与线程一样多的请求(通常可能很低,例如 4 个),那么 io 服务上没有其他任何进展,从而否定了 ASIO 的真正目的:异步 I/O.
让我快速解决您的问题代码中的一些小问题,使其独立:Live On Coliru
#include <boost/beast/http.hpp>
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::ip::tcp;
namespace beast = boost::beast;
namespace http = beast::http;
using Context = boost::asio::io_context;
using Strand = boost::asio::strand<Context::executor_type>;
struct Demo {
using Request = http::request<http::string_body>;
Demo(Context& ctx, tcp::endpoint ep) //
: strand_(ctx.get_executor())
{
stream_.connect(ep);
}
void Send(Request const& req)
{
post(strand_, [=,this]() {
// prepare request ...
http::write(stream_, req);
//...
http::response<boost::beast::http::dynamic_body> res;
beast::flat_buffer buffer;
beast::error_code ec;
http::read(stream_, buffer, res, ec);
std::cout << res << "\n";
});
}
private:
Strand strand_;
tcp::socket stream_{strand_};
};
int main() {
Context io;
Demo x(io, {{}, 80});
Demo::Request req{http::verb::get, "/", 10};
req.prepare_payload();
x.Send(req);
io.run();
}
改善
我建议使用安全的异步接口。 IE。您无法确定在前一个请求完成之前不会在同一个套接字上启动新请求,因此您需要一个队列:
void Send(Request req) {
post(strand_, [this, req = std::move(req)]() mutable {
_outgoing.push_back(std::move(req));
if (_outgoing.size() == 1) // no pending
ServiceRequestQueue();
});
}
现在,您拥有的所有逻辑都移到了请求循环中,但是异步:
void ServiceRequestQueue()
{
http::async_write( //
stream_, _outgoing.front(), [this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Request cannot be sent: " << ec.message() << std::endl;
return;
}
// receive response
_incoming.clear();
_incoming.body().clear();
http::async_read( //
stream_, buffer, _incoming,
[this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Response cannot be received: "
<< ec.message() << std::endl;
return;
}
// std::cout << _incoming.base() << "\n";
std::cout << stream_.remote_endpoint() << " "
<< _incoming.result() << " "
<< _incoming.body().size() << "\n";
// request done
_outgoing.pop_front();
// continue if more queued
if (not _outgoing.empty())
ServiceRequestQueue();
});
});
}
您可能希望将一些完成处理程序拆分为单独的函数,或者对请求做一些有用的事情。
int main() {
Context io;
Demo example_com { io, "93.184.216.34", 80 } ;
Demo coliru { io, "173.203.57.63", 80 } ;
Demo localhost { io, "127.0.0.1", 80 } ;
// queue many requests before service start
auto queue10 = [](Demo& client, std::string hostname, int version) {
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host,hostname);
req.prepare_payload();
for (int i = 0; i < 10; ++i)
client.Send(req);
};
queue10(example_com, "www.example.com", 11);
queue10(coliru, "coliru-stacked-crooked.com", 11);
queue10(localhost, "sehe.nl", 10);
// start service
io.run();
}
在我的系统上打印:
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
Note that if you create mamy requests simultaneously (e.g. even before running the
io_context
at all), you can observe that separate HTTP clients work in overlapping fashion.
高级
如果您真的想要一个启动请求并允许您在完成处理程序中使用响应的函数,请考虑像这样扩展您的接口:
template <typename Token>
void async_send(Request req, Token&& token) {
using result_type = typename boost::asio::async_result<
std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
struct Op {
Request req;
Response res;
handler_type handler;
Op(Request&& r, handler_type&& h)
: req(std::move(r))
, handler(std::move(h))
{
}
bool check(beast::error_code ec, bool force_completion = false) {
if (ec || force_completion)
std::move(handler)(ec, std::move(res));
return !ec.failed();
}
};
auto op = std::make_shared<Op>(std::move(req), std::move(handler));
post(strand_, [this, op] {
http::async_write( //
stream_, op->req,
[this, op](beast::error_code ec, size_t) mutable {
if (op->check(ec))
http::async_read(stream_, buffer, op->res,
[op](beast::error_code ec, size_t) {
op->check(ec, true);
});
});
});
return result.get();
}
请注意,这将避免每个客户端的重叠请求的责任移回给调用者。所以开始一些请求链,比如
// queue several request chains before service start
AsyncRequestChain(10, example_com, "www.example.com");
AsyncRequestChain(10, coliru, "coliru.stacked-crooked.com");
AsyncRequestChain(10, localhost, "sehe.nl");
// start service
io.run();
链本身是:
void AsyncRequestChain(unsigned n, Demo& client, std::string hostname)
{
if (!n)
return;
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host, hostname);
req.prepare_payload();
client.async_send( //
req, [=, &client](beast::error_code ec, Demo::Response&& res) {
std::cout << hostname << ": " << ec.message();
if (!ec)
std::cout << " " << res.result() //
<< " " << res.body().size();
std::cout << std::endl;
// continue with next iteration
AsyncRequestChain(n - 1, client, hostname);
});
}
在我的机器上打印:
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616