Asio 如何编写自定义 AsyncStream?
Asio How to write a custom AsyncStream?
我实际上已经设法编写了一个可用的 AsyncStream。但是,我不确定我是否按照预期的方式进行了操作。
我的主要问题是:get_executor()
函数应该 return 是哪个执行程序?
在实施过程中出现了几个问题。我用 Q<index>:
标记了它们。 (我会在编辑时保持索引稳定。)非常感谢他们的回答。
我尽量shorten/simplify这个例子。它确实可以正确编译和执行。
#include <iostream>
#include <syncstream>
#include <thread>
#include <coroutine>
#include <future>
#include <random>
#include <string>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <fmt/format.h>
inline std::osyncstream tout(const std::string & tag = "") {
auto hash = std::hash<std::thread::id>{}(std::this_thread::get_id());
auto hashStr = fmt::format("T{:04X} ", hash >> (sizeof(hash) - 2) * 8); // only display 2 bytes
auto stream = std::osyncstream(std::cout);
stream << hashStr;
if (not tag.empty())
stream << tag << " ";
return stream;
}
namespace asio = boost::asio;
template <typename Executor>
requires asio::is_executor<Executor>::value // Q1: Is this the correct way to require that Executor actually is an executor?
// I can't replace typename as there is no concept for Executors.
class Service : public std::enable_shared_from_this<Service<Executor>> {
template<typename CallerExecutor, typename ServiceExecutor>
// requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value
friend class MyAsyncStream;
/// Data sent to the service
std::string bufferIn;
/// Data produced by the service
std::string bufferOut;
/// The strand used to avoid concurrent execution if the passed executor is backed by multiple threads.
asio::strand<Executor> strand;
/// Used to slow the data consumption and generation
asio::steady_timer timer;
/// Used to generate data
std::mt19937 gen;
///
constexpr static const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
template<typename URBG>
static std::string gen_string(std::size_t length, URBG &&g) {
std::string result;
result.resize(length);
std::sample(std::cbegin(charset),
std::cend(charset),
std::begin(result),
std::intptr_t(length),
std::forward<URBG>(g));
return result;
}
static const constexpr auto MAX_OPS = 7;
asio::awaitable<void> main(std::shared_ptr<Service> captured_self) {
const constexpr auto TAG = "SrvCo";
auto exe = co_await asio::this_coro::executor;
auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);
for (size_t ops = 0; ops < MAX_OPS; ops++) {
timer.expires_after(std::chrono::milliseconds(1000));
co_await timer.async_wait(use_awaitable);
tout(TAG) << "Ops " << ops << std::endl;
bufferOut += gen_string(8, gen);
tout(TAG) << "Produced: " << bufferOut << std::endl;
auto consumed = std::string_view(bufferIn).substr(0, 4);
tout(TAG) << "Consumed: " << consumed << std::endl;
bufferIn.erase(0, consumed.size());
}
tout(TAG) << "Done" << std::endl;
}
std::once_flag initOnce;
public:
explicit Service(Executor && exe) : strand{asio::make_strand(exe)}, timer{exe.context()} {}
void init() {
std::call_once(initOnce, [this]() {
asio::co_spawn(strand, main(this->shared_from_this()), asio::detached);
});
}
};
/// https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/AsyncReadStream.html
template<typename CallerExecutor, typename ServiceExecutor>
// requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value // Q2: Q1 is working why isn't this working with two Types?
class MyAsyncStream {
typedef void async_rw_handler(boost::system::error_code, size_t);
/// Holds the callers executor.
/// Q3: Should this field even exist?
CallerExecutor executor;
/// Use a weak_ptr to behave like a file descriptor.
std::weak_ptr<Service<ServiceExecutor>> serviceRef;
public:
explicit MyAsyncStream(std::shared_ptr<Service<ServiceExecutor>> & service, CallerExecutor & exe) : executor{exe}, serviceRef{service} {}
/// Needed by the stream specification.
typedef CallerExecutor executor_type;
/**
* Q4: Which executor should this function return? The CallerExecutor or the ServiceExecutor or something different.
* In this example it is never called. However it is needed by the stream specification. https://www.boost.org/doc/libs/1_79_0/doc/html/boost_asio/reference/AsyncReadStream.html
* I really don't want to leak the ServiceExecutor to library users.
* @return Returns the executor supplied in the constructor.
*/
auto get_executor() {
tout() << "GETTING EXE" << std::endl;
return executor;
}
template<typename MutableBufferSequence,
asio::completion_token_for<async_rw_handler>
CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
requires asio::is_mutable_buffer_sequence<MutableBufferSequence>::value
auto async_read_some(const MutableBufferSequence &buffer,
CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) { // Q5: Can I avoid this async_initiate somehow?
BOOST_ASIO_READ_HANDLER_CHECK(CompletionToken, completion_handler) type_check; // I tried using co_spawn directly without success.
asio::co_spawn(
asio::get_associated_executor(completion_handler), // Q6-1: should I use get_executor() here? Currently, I just get the callers executor.
[&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
() mutable -> asio::awaitable<void> {
const constexpr auto TAG = "ARS";
auto callerExe = co_await asio::this_coro::executor;
auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);
auto service = serviceRef.lock();
if (service == nullptr) {
std::move(completion_handler)(asio::error::bad_descriptor, 0);
co_return;
}
auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);
co_await asio::post(to_service);
tout(TAG) << "performing read" << std::endl;
auto buf_begin = asio::buffers_begin(buffer);
auto buf_end = asio::buffers_end(buffer);
boost::system::error_code err = asio::error::fault;
size_t it = 0;
while (!service->bufferOut.empty()) {
if (buf_begin == buf_end) {
// error the buffer is smaller than the request read amount
err = asio::error::no_buffer_space;
goto completion;
}
*buf_begin++ = service->bufferOut.at(0);
service->bufferOut.erase(0, 1);
it++;
}
err = asio::stream_errc::eof;
completion:
co_await asio::post(to_caller); // without this call the function returns on the wrong thread
tout(TAG) << "read done returned" << std::endl;
std::move(completion_handler)(err, it);
}, asio::detached);
}, token);
}
template<typename ConstBufferSequence,
asio::completion_token_for <async_rw_handler>
CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
requires asio::is_const_buffer_sequence<ConstBufferSequence>::value
auto async_write_some(const ConstBufferSequence &buffer,
CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) {
BOOST_ASIO_WRITE_HANDLER_CHECK(CompletionToken, completion_handler) type_check;
asio::co_spawn(
asio::get_associated_executor(completion_handler), // Q6-2: should I use get_executor() here? Currently, I just get the callers executor.
[&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
() mutable -> asio::awaitable<void> {
const constexpr auto TAG = "AWS";
auto callerExe = co_await asio::this_coro::executor;
auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);
auto service = serviceRef.lock();
if (service == nullptr) {
std::move(completion_handler)(asio::error::bad_descriptor, 0);
co_return;
}
auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);
co_await asio::post(to_service);
tout(TAG) << "performing write" << std::endl;
auto buf_begin = asio::buffers_begin(buffer);
auto buf_end = asio::buffers_end(buffer);
boost::system::error_code err = asio::error::fault;
size_t it = 0;
while (buf_begin != buf_end) {
service->bufferIn.push_back(static_cast<char>(*buf_begin++));
it++;
}
err = asio::stream_errc::eof;
completion:
co_await asio::post(to_caller); // without this call the function returns on the wrong thread
tout(TAG) << "write done returned" << std::endl;
std::move(completion_handler)(err, it);
}, asio::detached);
}, token);
}
};
asio::awaitable<int> mainCo() {
const constexpr auto TAG = "MainCo";
auto exe = co_await asio::this_coro::executor;
auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);
auto as_tuple = asio::experimental::as_tuple(use_awaitable);
auto use_future = asio::use_future;
auto timer = asio::steady_timer(exe);
asio::thread_pool servicePool{1};
co_await asio::post(asio::bind_executor(servicePool, asio::use_awaitable));
tout() << "ServiceThread run start" << std::endl;
co_await asio::post(use_awaitable);
auto service = std::make_shared<Service<boost::asio::thread_pool::basic_executor_type<std::allocator<void>, 0> >>(servicePool.get_executor());
service->init();
auto stream = MyAsyncStream{service, exe};
for (size_t it = 0; it < 4; it++) {
{
std::vector<char> dataBackend;
auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple); // Q7-1: Can I avoid using as_tuple here?
tout(TAG) << "read done: " << std::endl
<< "n: " << n << std::endl
<< "msg: " << std::string{dataBackend.begin(), dataBackend.end()} << std::endl
<< "ec: " << ec.message()
<< std::endl;
}
{
auto const constexpr str = std::string_view{"HelloW"};
std::vector<char> dataBackend{str.begin(), str.end()};
auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
auto [ec, n] = co_await asio::async_write(stream, dynBuffer, as_tuple); // Q7-2: Can I avoid using as_tuple here?
tout(TAG) << "write done: " << std::endl
<< "n: " << n << std::endl
<< "msg: " << str << std::endl
<< "ec: " << ec.message()
<< std::endl;
}
timer.expires_after(std::chrono::milliseconds(2500));
co_await timer.async_wait(use_awaitable);
}
servicePool.join();
tout(TAG) << "Normal exit" << std::endl;
co_return 0;
}
int main() {
asio::io_context appCtx;
auto fut = asio::co_spawn(asio::make_strand(appCtx), mainCo(), asio::use_future);
tout() << "MainThread run start" << std::endl;
appCtx.run();
tout() << "MainThread run done" << std::endl;
return fut.get();
}
Q1
我想看起来不错。但是,请参阅 Q2。
Q2
看起来它杀死了 AsyncStream
的 CTAD。如果我不得不猜测这是因为 ServiceExecutor
在 non-deduced 上下文中。手动帮助它可能会有所帮助,但请注意此处的第二个静态断言是如何失败的:
using ServiceExecutor = asio::thread_pool::executor_type;
using CallerExecutor = asio::any_io_executor;
static_assert(asio::is_executor<ServiceExecutor>::value);
static_assert(asio::is_executor<CallerExecutor>::value);
那是因为co_await this_coro::executor
returnsany_io_executor
,是不同“牌子”的executor。您需要使用 execution::is_executor<T>::value
进行检查。事实上,您可能希望像在 Asio 实现函数中那样进行兼容性检查:
(is_executor<Executor>::value || execution::is_executor<Executor>::value)
&& is_convertible<Executor, AwaitableExecutor>::value
PS:
It dawned on me that the non-deduced context is a symptom of
overly-specific template arguments. Just make AsyncStream<Executor, Service>
(why bother with the specific type arguments that are
implementation details of Service
?). That fixes the
CTAD (Live On Compiler Explorer)
template <typename CallerExecutor, typename Service>
requires my_is_executor<CallerExecutor>::value //
class MyAsyncStream {
Q3:这个字段应该存在吗?
CallerExecutor executor;
是的,这就是 IO 对象记住其绑定执行程序的方式。
Q4:那是你 return 调用者执行者所在的位置。
它没有在您的应用程序中调用,但它可能会被调用。如果您针对您的 IO 对象 (MyAsyncStream
) 调用任何组合操作(如 asio::async_read_until
),它会 - 默认情况下 - 运行 关联执行程序上的任何处理程序。这可能会添加正确性所需的行为(如处理程序序列化、工作跟踪等)。
像以往一样,处理程序可以绑定到另一个执行程序以覆盖它。
Q5 我不这么认为,除非您想强制使用 use_awaitable
(或兼容的)完成令牌。事实上,你 运行 内部的 coro 应该是调用者的实现细节。
Q6 是的,但不是而是关闭。我假设您需要使用 IO 对象的执行程序作为后备:
asio::get_associated_executor(
completion_handler, this->get_executor())
Q7-1:我可以避免在这里使用as_tuple吗?
auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple);
我想如果你能“仅仅”处理 system_error
个异常:
auto n = co_await asio::async_read(stream, dynBuffer, use_awaitable);
我实际上已经设法编写了一个可用的 AsyncStream。但是,我不确定我是否按照预期的方式进行了操作。
我的主要问题是:get_executor()
函数应该 return 是哪个执行程序?
在实施过程中出现了几个问题。我用 Q<index>:
标记了它们。 (我会在编辑时保持索引稳定。)非常感谢他们的回答。
我尽量shorten/simplify这个例子。它确实可以正确编译和执行。
#include <iostream>
#include <syncstream>
#include <thread>
#include <coroutine>
#include <future>
#include <random>
#include <string>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <fmt/format.h>
inline std::osyncstream tout(const std::string & tag = "") {
auto hash = std::hash<std::thread::id>{}(std::this_thread::get_id());
auto hashStr = fmt::format("T{:04X} ", hash >> (sizeof(hash) - 2) * 8); // only display 2 bytes
auto stream = std::osyncstream(std::cout);
stream << hashStr;
if (not tag.empty())
stream << tag << " ";
return stream;
}
namespace asio = boost::asio;
template <typename Executor>
requires asio::is_executor<Executor>::value // Q1: Is this the correct way to require that Executor actually is an executor?
// I can't replace typename as there is no concept for Executors.
class Service : public std::enable_shared_from_this<Service<Executor>> {
template<typename CallerExecutor, typename ServiceExecutor>
// requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value
friend class MyAsyncStream;
/// Data sent to the service
std::string bufferIn;
/// Data produced by the service
std::string bufferOut;
/// The strand used to avoid concurrent execution if the passed executor is backed by multiple threads.
asio::strand<Executor> strand;
/// Used to slow the data consumption and generation
asio::steady_timer timer;
/// Used to generate data
std::mt19937 gen;
///
constexpr static const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
template<typename URBG>
static std::string gen_string(std::size_t length, URBG &&g) {
std::string result;
result.resize(length);
std::sample(std::cbegin(charset),
std::cend(charset),
std::begin(result),
std::intptr_t(length),
std::forward<URBG>(g));
return result;
}
static const constexpr auto MAX_OPS = 7;
asio::awaitable<void> main(std::shared_ptr<Service> captured_self) {
const constexpr auto TAG = "SrvCo";
auto exe = co_await asio::this_coro::executor;
auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);
for (size_t ops = 0; ops < MAX_OPS; ops++) {
timer.expires_after(std::chrono::milliseconds(1000));
co_await timer.async_wait(use_awaitable);
tout(TAG) << "Ops " << ops << std::endl;
bufferOut += gen_string(8, gen);
tout(TAG) << "Produced: " << bufferOut << std::endl;
auto consumed = std::string_view(bufferIn).substr(0, 4);
tout(TAG) << "Consumed: " << consumed << std::endl;
bufferIn.erase(0, consumed.size());
}
tout(TAG) << "Done" << std::endl;
}
std::once_flag initOnce;
public:
explicit Service(Executor && exe) : strand{asio::make_strand(exe)}, timer{exe.context()} {}
void init() {
std::call_once(initOnce, [this]() {
asio::co_spawn(strand, main(this->shared_from_this()), asio::detached);
});
}
};
/// https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/AsyncReadStream.html
template<typename CallerExecutor, typename ServiceExecutor>
// requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value // Q2: Q1 is working why isn't this working with two Types?
class MyAsyncStream {
typedef void async_rw_handler(boost::system::error_code, size_t);
/// Holds the callers executor.
/// Q3: Should this field even exist?
CallerExecutor executor;
/// Use a weak_ptr to behave like a file descriptor.
std::weak_ptr<Service<ServiceExecutor>> serviceRef;
public:
explicit MyAsyncStream(std::shared_ptr<Service<ServiceExecutor>> & service, CallerExecutor & exe) : executor{exe}, serviceRef{service} {}
/// Needed by the stream specification.
typedef CallerExecutor executor_type;
/**
* Q4: Which executor should this function return? The CallerExecutor or the ServiceExecutor or something different.
* In this example it is never called. However it is needed by the stream specification. https://www.boost.org/doc/libs/1_79_0/doc/html/boost_asio/reference/AsyncReadStream.html
* I really don't want to leak the ServiceExecutor to library users.
* @return Returns the executor supplied in the constructor.
*/
auto get_executor() {
tout() << "GETTING EXE" << std::endl;
return executor;
}
template<typename MutableBufferSequence,
asio::completion_token_for<async_rw_handler>
CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
requires asio::is_mutable_buffer_sequence<MutableBufferSequence>::value
auto async_read_some(const MutableBufferSequence &buffer,
CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) { // Q5: Can I avoid this async_initiate somehow?
BOOST_ASIO_READ_HANDLER_CHECK(CompletionToken, completion_handler) type_check; // I tried using co_spawn directly without success.
asio::co_spawn(
asio::get_associated_executor(completion_handler), // Q6-1: should I use get_executor() here? Currently, I just get the callers executor.
[&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
() mutable -> asio::awaitable<void> {
const constexpr auto TAG = "ARS";
auto callerExe = co_await asio::this_coro::executor;
auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);
auto service = serviceRef.lock();
if (service == nullptr) {
std::move(completion_handler)(asio::error::bad_descriptor, 0);
co_return;
}
auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);
co_await asio::post(to_service);
tout(TAG) << "performing read" << std::endl;
auto buf_begin = asio::buffers_begin(buffer);
auto buf_end = asio::buffers_end(buffer);
boost::system::error_code err = asio::error::fault;
size_t it = 0;
while (!service->bufferOut.empty()) {
if (buf_begin == buf_end) {
// error the buffer is smaller than the request read amount
err = asio::error::no_buffer_space;
goto completion;
}
*buf_begin++ = service->bufferOut.at(0);
service->bufferOut.erase(0, 1);
it++;
}
err = asio::stream_errc::eof;
completion:
co_await asio::post(to_caller); // without this call the function returns on the wrong thread
tout(TAG) << "read done returned" << std::endl;
std::move(completion_handler)(err, it);
}, asio::detached);
}, token);
}
template<typename ConstBufferSequence,
asio::completion_token_for <async_rw_handler>
CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
requires asio::is_const_buffer_sequence<ConstBufferSequence>::value
auto async_write_some(const ConstBufferSequence &buffer,
CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) {
BOOST_ASIO_WRITE_HANDLER_CHECK(CompletionToken, completion_handler) type_check;
asio::co_spawn(
asio::get_associated_executor(completion_handler), // Q6-2: should I use get_executor() here? Currently, I just get the callers executor.
[&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
() mutable -> asio::awaitable<void> {
const constexpr auto TAG = "AWS";
auto callerExe = co_await asio::this_coro::executor;
auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);
auto service = serviceRef.lock();
if (service == nullptr) {
std::move(completion_handler)(asio::error::bad_descriptor, 0);
co_return;
}
auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);
co_await asio::post(to_service);
tout(TAG) << "performing write" << std::endl;
auto buf_begin = asio::buffers_begin(buffer);
auto buf_end = asio::buffers_end(buffer);
boost::system::error_code err = asio::error::fault;
size_t it = 0;
while (buf_begin != buf_end) {
service->bufferIn.push_back(static_cast<char>(*buf_begin++));
it++;
}
err = asio::stream_errc::eof;
completion:
co_await asio::post(to_caller); // without this call the function returns on the wrong thread
tout(TAG) << "write done returned" << std::endl;
std::move(completion_handler)(err, it);
}, asio::detached);
}, token);
}
};
asio::awaitable<int> mainCo() {
const constexpr auto TAG = "MainCo";
auto exe = co_await asio::this_coro::executor;
auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);
auto as_tuple = asio::experimental::as_tuple(use_awaitable);
auto use_future = asio::use_future;
auto timer = asio::steady_timer(exe);
asio::thread_pool servicePool{1};
co_await asio::post(asio::bind_executor(servicePool, asio::use_awaitable));
tout() << "ServiceThread run start" << std::endl;
co_await asio::post(use_awaitable);
auto service = std::make_shared<Service<boost::asio::thread_pool::basic_executor_type<std::allocator<void>, 0> >>(servicePool.get_executor());
service->init();
auto stream = MyAsyncStream{service, exe};
for (size_t it = 0; it < 4; it++) {
{
std::vector<char> dataBackend;
auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple); // Q7-1: Can I avoid using as_tuple here?
tout(TAG) << "read done: " << std::endl
<< "n: " << n << std::endl
<< "msg: " << std::string{dataBackend.begin(), dataBackend.end()} << std::endl
<< "ec: " << ec.message()
<< std::endl;
}
{
auto const constexpr str = std::string_view{"HelloW"};
std::vector<char> dataBackend{str.begin(), str.end()};
auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
auto [ec, n] = co_await asio::async_write(stream, dynBuffer, as_tuple); // Q7-2: Can I avoid using as_tuple here?
tout(TAG) << "write done: " << std::endl
<< "n: " << n << std::endl
<< "msg: " << str << std::endl
<< "ec: " << ec.message()
<< std::endl;
}
timer.expires_after(std::chrono::milliseconds(2500));
co_await timer.async_wait(use_awaitable);
}
servicePool.join();
tout(TAG) << "Normal exit" << std::endl;
co_return 0;
}
int main() {
asio::io_context appCtx;
auto fut = asio::co_spawn(asio::make_strand(appCtx), mainCo(), asio::use_future);
tout() << "MainThread run start" << std::endl;
appCtx.run();
tout() << "MainThread run done" << std::endl;
return fut.get();
}
Q1
我想看起来不错。但是,请参阅 Q2。
Q2
看起来它杀死了
AsyncStream
的 CTAD。如果我不得不猜测这是因为ServiceExecutor
在 non-deduced 上下文中。手动帮助它可能会有所帮助,但请注意此处的第二个静态断言是如何失败的:using ServiceExecutor = asio::thread_pool::executor_type; using CallerExecutor = asio::any_io_executor; static_assert(asio::is_executor<ServiceExecutor>::value); static_assert(asio::is_executor<CallerExecutor>::value);
那是因为
co_await this_coro::executor
returnsany_io_executor
,是不同“牌子”的executor。您需要使用execution::is_executor<T>::value
进行检查。事实上,您可能希望像在 Asio 实现函数中那样进行兼容性检查:(is_executor<Executor>::value || execution::is_executor<Executor>::value) && is_convertible<Executor, AwaitableExecutor>::value
PS: It dawned on me that the non-deduced context is a symptom of overly-specific template arguments. Just make
AsyncStream<Executor, Service>
(why bother with the specific type arguments that are implementation details ofService
?). That fixes the CTAD (Live On Compiler Explorer)template <typename CallerExecutor, typename Service> requires my_is_executor<CallerExecutor>::value // class MyAsyncStream {
Q3:这个字段应该存在吗?
CallerExecutor executor;
是的,这就是 IO 对象记住其绑定执行程序的方式。
Q4:那是你 return 调用者执行者所在的位置。
它没有在您的应用程序中调用,但它可能会被调用。如果您针对您的 IO 对象 (
MyAsyncStream
) 调用任何组合操作(如asio::async_read_until
),它会 - 默认情况下 - 运行 关联执行程序上的任何处理程序。这可能会添加正确性所需的行为(如处理程序序列化、工作跟踪等)。像以往一样,处理程序可以绑定到另一个执行程序以覆盖它。
Q5 我不这么认为,除非您想强制使用
use_awaitable
(或兼容的)完成令牌。事实上,你 运行 内部的 coro 应该是调用者的实现细节。Q6 是的,但不是而是关闭。我假设您需要使用 IO 对象的执行程序作为后备:
asio::get_associated_executor( completion_handler, this->get_executor())
Q7-1:我可以避免在这里使用as_tuple吗?
auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple);
我想如果你能“仅仅”处理
system_error
个异常:auto n = co_await asio::async_read(stream, dynBuffer, use_awaitable);