我如何 return 使用从 on_read 处理程序调度的最终回调异步返回给调用者的响应?
how do i return the response back to caller asynchronously using a final callback dispatched from on_read handler?
我需要为 C++ 客户端公开异步 REST api,它在内部使用 boost::beast 发送 REST 请求/接收响应。
起点是http_client_async.cpp例子。
现在客户端将使用此异步 api 传递回调函数,需要在 REST 操作结束时从 on_read() 处理程序 [http_client_async.cpp 调用], 将完整的响应传递回调用者。
我怎样才能做到这一点?
在您呼唤时引用 this example:
修改 session
构造函数以接受接受 http 状态整数和正文字符串的回调。
typedef std::function<void(unsigned int, const std::string&)> CALLBACK;
CALLBACK callback_;
explicit
session(net::io_context& ioc, CALLBACK& callback)
: resolver_(net::make_strand(ioc))
, stream_(net::make_strand(ioc))
, _callback(callback)
{
}
修改 session::on_read
以调用回调。
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
if(ec)
{
_callback(0, "");
}
else
{
_callback(_res.result_int(), _res.body());
}
}
but is there any way to invoke thie _callback through asio's io_context? I would like to call this callback in async fashion since this callback , which is provided by the user could block, and thus block the io_context's thread aswell? Similar to the way the other handlers like the on_read(), on_write() etc are scheduled in the io_context?
是的。您所追求的是 async_result 协议。我在其他答案中有一些例子(例如 )。
这是构建基块:
存储处理程序
在你的 "session" 中(让我们重命名它 http_request_op
并将它隐藏在一些细节命名空间中),你想要记住一个完成处理程序。
别担心,没有人 想出这样的处理程序。我们将添加一个 初始化函数 async_http_request
来为您制作它。
最终用户可能会使用未来或协程 (yield_context)。当然,如果他们愿意,他们可以提供普通回调。
using Response = http::response<http::string_body>;
template <typename Handler>
class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
// ...
Response res_;
Handler handler_;
// ...
public:
template <typename Executor>
explicit http_request_op(Executor ex, Handler handler)
: resolver_(ex),
stream_(ex),
handler_(std::move(handler))
{ }
现在,在最后一步中调用 handler_
。为简单起见,我将 fail
辅助函数变成了一个成员函数,并将其命名为 complete
:
void complete(beast::error_code ec, char const* what) {
if (ec && what) {
// TODO: A better idea would to make a custom `Response` type that
// has room for "fail stage"
res_.reason(what);
}
post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
handler_(ec, std::move(res_));
});
}
所有以前检查ec
和使用fail
的地方现在都用相同的ec
调用complete
。此外,在 on_read
中我们添加了一个 unconditional completion:
void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return complete(ec, "read");
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
// unconditional complete here
return complete(ec, "shutdown");
}
启动函数(async_http_request
)
template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
std::make_shared<detail::http_request_op<handler_type> >
(make_strand(ctx), std::move(handler))
->start(host, port, target, version);
return result.get();
}
你看这会创建一个异步结果,它根据传递的令牌制作一个 "handler",踢出 http_request_op
和 return 异步结果。
returned 的内容取决于传递的令牌。查看用法:
用法
我将展示最终用户可以选择使用此 async_http_request
启动功能的各种方式:
使用未来
auto future = async_http_request(ioc.get_executor(), host, port, target, version, net::use_future);
ioc.run();
std::cout << future.get() << "\n";
return类型是std::future<Response>
。
The creation of the promise and setting the return value/exception information is magically handled by Asio.
使用 coroutine/yield 上下文:
net::spawn(ioc, [&ioc,args](net::yield_context yield) {
try {
auto host = args[0];
auto port = args[1];
auto target = args[2];
int version = args[3]=="1.0"? 10 : 11;
Response res = async_http_request(
ioc,
host, port, target, version,
yield);
std::cout << res << std::endl;
} catch (boost::system::system_error const& se) {
// no way to get at response here
std::cout << "There was an error: " << se.code().message() << std::endl;
}
});
ioc.run();
这里的return类型就是Response
。请注意,如果报告了错误情况,则会引发异常。或者,传递一个 error_code 变量:
beast::error_code ec;
Response res = async_http_request(
ioc,
host, port, target, version,
yield[ec]);
std::cout << ec.message() << "\n" << res << std::endl;
仍在使用回调
/*void*/ async_http_request(ioc, host, port, target, version,
[](beast::error_code ec, Response const& res) {
std::cout << ec.message() << "\b" << res << "\n";
});
return 值最终变为 void
。
完整的演示代码
没有现场演示,因为没有在线编译器支持网络请求,而且它超出了编译限制(例如here)
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <iostream>
#include <memory>
namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
using Response = http::response<http::string_body>;
namespace detail {
template <typename Handler>
class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
tcp::resolver resolver_;
beast::tcp_stream stream_;
beast::flat_buffer buffer_;
http::request<http::empty_body> req_;
Response res_;
Handler handler_;
template <typename F>
auto bind(F ptmf) { return beast::bind_front_handler(ptmf, this->shared_from_this()); }
void complete(beast::error_code ec, char const* what) {
if (ec && what) {
// TODO: A better idea would to make a custom `Response` type that
// has room for "fail stage"
res_.reason(what);
}
post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
handler_(ec, std::move(res_));
});
}
public:
template <typename Executor>
explicit http_request_op(Executor ex, Handler handler)
: resolver_(ex),
stream_(ex),
handler_(std::move(handler))
{ }
void start(beast::string_view host, beast::string_view port, beast::string_view target, int version) {
req_.version(version);
req_.method(http::verb::get);
req_.target(target);
req_.set(http::field::host, host);
req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
resolver_.async_resolve(host.to_string(), port.to_string(),
bind_executor(stream_.get_executor(), bind(&http_request_op::on_resolve)));
}
private:
void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
if (ec)
return complete(ec, "resolve");
stream_.expires_after(std::chrono::seconds(30));
stream_.async_connect(results, bind(&http_request_op::on_connect));
}
void on_connect(beast::error_code const& ec, tcp::endpoint const&) {
if (ec)
return complete(ec, "connect");
stream_.expires_after(std::chrono::seconds(30));
http::async_write(stream_, req_, bind(&http_request_op::on_write));
}
void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return complete(ec, "read");
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
// unconditional complete here
return complete(ec, "shutdown");
}
void on_write(beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return complete(ec, "write");
http::async_read(stream_, buffer_, res_, bind(&http_request_op::on_read));
}
};
}
template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
std::make_shared<detail::http_request_op<handler_type> >
(make_strand(ctx), std::move(handler))
->start(host, port, target, version);
return result.get();
}
int main(int argc, char** argv) {
std::vector<beast::string_view> args{argv+1, argv+argc};
if (args.size() == 3) args.push_back("1.1");
if (args.size() != 4) {
std::cerr << "Usage: http-client-async <host> <port> <target> [<HTTP "
"version: 1.0 or 1.1(default)>]\n"
<< "Example:\n"
<< " http-client-async www.example.com 80 /\n"
<< " http-client-async www.example.com 80 / 1.0\n";
return 255;
}
auto host = args[0];
auto port = args[1];
auto target = args[2];
int version = args[3]=="1.0"? 10 : 11;
net::io_context ioc;
net::spawn(ioc, [=,&ioc](net::yield_context yield) {
try {
Response res = async_http_request(
ioc,
host, port, target, version,
yield);
std::cout << "From coro (try/catch): " << res.reason() << std::endl;
} catch (boost::system::system_error const& se) {
// no way to get at response here
std::cout << "coro exception: " << se.code().message() << std::endl;
}
});
net::spawn(ioc, [=,&ioc](net::yield_context yield) {
beast::error_code ec;
Response res = async_http_request(
ioc,
host, port, target, version,
yield[ec]);
std::cout << "From coro: " << ec.message() << ", " << res.reason() << "\n";
});
/*void*/ async_http_request(ioc, host, port, target, version,
[](beast::error_code ec, Response const& res) {
std::cout << "From callback: " << ec.message() << ", " << res.reason() << "\n";
});
auto future = async_http_request(ioc, host, port, target, version, net::use_future);
ioc.run();
try {
std::cout << "From future: " << future.get().reason() << "\n";
} catch (boost::system::system_error const& se) {
std::cout << "future exception: " << se.code().message() << std::endl;
}
}
成功和失败请求的输出:
$ ./sotest www.example.com 80 / 1.1
From callback: Success, OK
From coro: Success, OK
From coro (try/catch): OK
From future: OK
$ ./sotest www.example.com 81 / 1.1
From callback: The socket was closed due to a timeout, connect
coro exception: The socket was closed due to a timeout
From coro: The socket was closed due to a timeout, connect
From future: future exception: The socket was closed due to a timeout
$ ./sotest www.example.cough 80 / 1.1
From callback: Host not found (authoritative), resolve
coro exception: Host not found (authoritative)
From coro: Host not found (authoritative), resolve
From future: future exception: Host not found (authoritative)
$ ./sotest www.example.com rhubarb / 1.1
From callback: Service not found, resolve
coro exception: Service not found
From coro: Service not found, resolve
From future: future exception: Service not found
请注意,超时示例当然总共运行约 30 秒,因为一切都是异步运行的。
我需要为 C++ 客户端公开异步 REST api,它在内部使用 boost::beast 发送 REST 请求/接收响应。
起点是http_client_async.cpp例子。
现在客户端将使用此异步 api 传递回调函数,需要在 REST 操作结束时从 on_read() 处理程序 [http_client_async.cpp 调用], 将完整的响应传递回调用者。
我怎样才能做到这一点?
在您呼唤时引用 this example:
修改 session
构造函数以接受接受 http 状态整数和正文字符串的回调。
typedef std::function<void(unsigned int, const std::string&)> CALLBACK;
CALLBACK callback_;
explicit
session(net::io_context& ioc, CALLBACK& callback)
: resolver_(net::make_strand(ioc))
, stream_(net::make_strand(ioc))
, _callback(callback)
{
}
修改 session::on_read
以调用回调。
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
if(ec)
{
_callback(0, "");
}
else
{
_callback(_res.result_int(), _res.body());
}
}
but is there any way to invoke thie _callback through asio's io_context? I would like to call this callback in async fashion since this callback , which is provided by the user could block, and thus block the io_context's thread aswell? Similar to the way the other handlers like the on_read(), on_write() etc are scheduled in the io_context?
是的。您所追求的是 async_result 协议。我在其他答案中有一些例子(例如
这是构建基块:
存储处理程序
在你的 "session" 中(让我们重命名它 http_request_op
并将它隐藏在一些细节命名空间中),你想要记住一个完成处理程序。
别担心,没有人 想出这样的处理程序。我们将添加一个 初始化函数 async_http_request
来为您制作它。
最终用户可能会使用未来或协程 (yield_context)。当然,如果他们愿意,他们可以提供普通回调。
using Response = http::response<http::string_body>;
template <typename Handler>
class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
// ...
Response res_;
Handler handler_;
// ...
public:
template <typename Executor>
explicit http_request_op(Executor ex, Handler handler)
: resolver_(ex),
stream_(ex),
handler_(std::move(handler))
{ }
现在,在最后一步中调用 handler_
。为简单起见,我将 fail
辅助函数变成了一个成员函数,并将其命名为 complete
:
void complete(beast::error_code ec, char const* what) {
if (ec && what) {
// TODO: A better idea would to make a custom `Response` type that
// has room for "fail stage"
res_.reason(what);
}
post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
handler_(ec, std::move(res_));
});
}
所有以前检查ec
和使用fail
的地方现在都用相同的ec
调用complete
。此外,在 on_read
中我们添加了一个 unconditional completion:
void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return complete(ec, "read");
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
// unconditional complete here
return complete(ec, "shutdown");
}
启动函数(async_http_request
)
template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
std::make_shared<detail::http_request_op<handler_type> >
(make_strand(ctx), std::move(handler))
->start(host, port, target, version);
return result.get();
}
你看这会创建一个异步结果,它根据传递的令牌制作一个 "handler",踢出 http_request_op
和 return 异步结果。
returned 的内容取决于传递的令牌。查看用法:
用法
我将展示最终用户可以选择使用此 async_http_request
启动功能的各种方式:
使用未来
auto future = async_http_request(ioc.get_executor(), host, port, target, version, net::use_future);
ioc.run();
std::cout << future.get() << "\n";
return类型是std::future<Response>
。
The creation of the promise and setting the return value/exception information is magically handled by Asio.
使用 coroutine/yield 上下文:
net::spawn(ioc, [&ioc,args](net::yield_context yield) {
try {
auto host = args[0];
auto port = args[1];
auto target = args[2];
int version = args[3]=="1.0"? 10 : 11;
Response res = async_http_request(
ioc,
host, port, target, version,
yield);
std::cout << res << std::endl;
} catch (boost::system::system_error const& se) {
// no way to get at response here
std::cout << "There was an error: " << se.code().message() << std::endl;
}
});
ioc.run();
这里的return类型就是Response
。请注意,如果报告了错误情况,则会引发异常。或者,传递一个 error_code 变量:
beast::error_code ec;
Response res = async_http_request(
ioc,
host, port, target, version,
yield[ec]);
std::cout << ec.message() << "\n" << res << std::endl;
仍在使用回调
/*void*/ async_http_request(ioc, host, port, target, version,
[](beast::error_code ec, Response const& res) {
std::cout << ec.message() << "\b" << res << "\n";
});
return 值最终变为 void
。
完整的演示代码
没有现场演示,因为没有在线编译器支持网络请求,而且它超出了编译限制(例如here)
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <iostream>
#include <memory>
namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
using Response = http::response<http::string_body>;
namespace detail {
template <typename Handler>
class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
tcp::resolver resolver_;
beast::tcp_stream stream_;
beast::flat_buffer buffer_;
http::request<http::empty_body> req_;
Response res_;
Handler handler_;
template <typename F>
auto bind(F ptmf) { return beast::bind_front_handler(ptmf, this->shared_from_this()); }
void complete(beast::error_code ec, char const* what) {
if (ec && what) {
// TODO: A better idea would to make a custom `Response` type that
// has room for "fail stage"
res_.reason(what);
}
post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
handler_(ec, std::move(res_));
});
}
public:
template <typename Executor>
explicit http_request_op(Executor ex, Handler handler)
: resolver_(ex),
stream_(ex),
handler_(std::move(handler))
{ }
void start(beast::string_view host, beast::string_view port, beast::string_view target, int version) {
req_.version(version);
req_.method(http::verb::get);
req_.target(target);
req_.set(http::field::host, host);
req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
resolver_.async_resolve(host.to_string(), port.to_string(),
bind_executor(stream_.get_executor(), bind(&http_request_op::on_resolve)));
}
private:
void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
if (ec)
return complete(ec, "resolve");
stream_.expires_after(std::chrono::seconds(30));
stream_.async_connect(results, bind(&http_request_op::on_connect));
}
void on_connect(beast::error_code const& ec, tcp::endpoint const&) {
if (ec)
return complete(ec, "connect");
stream_.expires_after(std::chrono::seconds(30));
http::async_write(stream_, req_, bind(&http_request_op::on_write));
}
void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return complete(ec, "read");
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
// unconditional complete here
return complete(ec, "shutdown");
}
void on_write(beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return complete(ec, "write");
http::async_read(stream_, buffer_, res_, bind(&http_request_op::on_read));
}
};
}
template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
std::make_shared<detail::http_request_op<handler_type> >
(make_strand(ctx), std::move(handler))
->start(host, port, target, version);
return result.get();
}
int main(int argc, char** argv) {
std::vector<beast::string_view> args{argv+1, argv+argc};
if (args.size() == 3) args.push_back("1.1");
if (args.size() != 4) {
std::cerr << "Usage: http-client-async <host> <port> <target> [<HTTP "
"version: 1.0 or 1.1(default)>]\n"
<< "Example:\n"
<< " http-client-async www.example.com 80 /\n"
<< " http-client-async www.example.com 80 / 1.0\n";
return 255;
}
auto host = args[0];
auto port = args[1];
auto target = args[2];
int version = args[3]=="1.0"? 10 : 11;
net::io_context ioc;
net::spawn(ioc, [=,&ioc](net::yield_context yield) {
try {
Response res = async_http_request(
ioc,
host, port, target, version,
yield);
std::cout << "From coro (try/catch): " << res.reason() << std::endl;
} catch (boost::system::system_error const& se) {
// no way to get at response here
std::cout << "coro exception: " << se.code().message() << std::endl;
}
});
net::spawn(ioc, [=,&ioc](net::yield_context yield) {
beast::error_code ec;
Response res = async_http_request(
ioc,
host, port, target, version,
yield[ec]);
std::cout << "From coro: " << ec.message() << ", " << res.reason() << "\n";
});
/*void*/ async_http_request(ioc, host, port, target, version,
[](beast::error_code ec, Response const& res) {
std::cout << "From callback: " << ec.message() << ", " << res.reason() << "\n";
});
auto future = async_http_request(ioc, host, port, target, version, net::use_future);
ioc.run();
try {
std::cout << "From future: " << future.get().reason() << "\n";
} catch (boost::system::system_error const& se) {
std::cout << "future exception: " << se.code().message() << std::endl;
}
}
成功和失败请求的输出:
$ ./sotest www.example.com 80 / 1.1
From callback: Success, OK
From coro: Success, OK
From coro (try/catch): OK
From future: OK
$ ./sotest www.example.com 81 / 1.1
From callback: The socket was closed due to a timeout, connect
coro exception: The socket was closed due to a timeout
From coro: The socket was closed due to a timeout, connect
From future: future exception: The socket was closed due to a timeout
$ ./sotest www.example.cough 80 / 1.1
From callback: Host not found (authoritative), resolve
coro exception: Host not found (authoritative)
From coro: Host not found (authoritative), resolve
From future: future exception: Host not found (authoritative)
$ ./sotest www.example.com rhubarb / 1.1
From callback: Service not found, resolve
coro exception: Service not found
From coro: Service not found, resolve
From future: future exception: Service not found
请注意,超时示例当然总共运行约 30 秒,因为一切都是异步运行的。