C++嵌套协程破坏问题
C++ nested coroutine destruction issue
我正在试验 clang-5 及其协程 TS 实现。
我正在尝试将它与 boost asio 一起使用,但 运行 出现问题,我的协程堆栈框架似乎被破坏了两次,但我无法弄清楚我做错了什么。
这是我简化的再现场景:
#include <experimental/coroutine>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
inline auto async_connect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
struct Awaiter
{
ip::tcp::socket& s;
ip::tcp::endpoint ep;
boost::system::error_code ec;
bool ready = false;
Awaiter(ip::tcp::socket& s, const ip::tcp::endpoint& ep) : s(s) , ep(ep) {}
bool await_ready() { return ready; }
void await_resume()
{
if (ec)
{
throw boost::system::system_error(ec);
}
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
s.async_connect(this->ep, [this, coro] (auto ec) mutable {
this->ready = true;
this->ec = ec;
std::cout << "Connect ready: resume " << coro.address() << "\n";
coro.resume();
});
std::cout << "Connect initiated\n";
}
};
return Awaiter(s, ep);
}
struct Future
{
struct promise_type
{
bool _ready = false;
std::experimental::coroutine_handle<> _waiter = nullptr;
Future get_return_object()
{
auto coro = std::experimental::coroutine_handle<promise_type>::from_promise(*this);
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << coro.address() << "]\n";
return Future(coro);
}
auto initial_suspend()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
return std::experimental::suspend_never();
}
auto final_suspend()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
return std::experimental::suspend_always();
}
void return_void()
{
_ready = true;
if (_waiter)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "] resume waiter[" << _waiter.address() << "]\n ";
_waiter.resume();
}
}
void unhandled_exception()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
std::terminate();
}
};
Future() = default;
Future(const Future&) = delete;
Future& operator=(const Future&) = delete;
Future& operator=(Future&& other)
{
_coro = other._coro;
other._coro = nullptr;
return *this;
}
explicit Future(std::experimental::coroutine_handle<promise_type> coro)
: _coro(coro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
}
Future(Future&& f)
: _coro(f._coro)
{
f._coro = nullptr;
}
~Future()
{
if (_coro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
_coro.destroy();
//_coro = nullptr;
}
}
bool await_ready()
{
assert(_coro);
std::cout << __FUNCTION__ << ":" << __LINE__ << ": ready " << _coro.promise()._ready << std::endl;
return _coro.promise()._ready;
}
void await_suspend(std::experimental::coroutine_handle<> callerCoro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "] waiter [" << callerCoro.address() << "]\n";
_coro.promise()._waiter = callerCoro;
}
void await_resume()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
}
void* address()
{
return _coro.address();
}
std::experimental::coroutine_handle<promise_type> _coro = nullptr;
};
struct RaiiCheck
{
~RaiiCheck()
{
std::cout << "******** ~RaiiCheck " << this << " ********\n";
}
};
Future asioConnect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
RaiiCheck rc;
s.open(ep.protocol());
co_await async_connect(s, ep);
}
template <typename Awaitable>
Future waitForIt(io_service& io, Awaitable&& awaitable)
{
co_await awaitable;
io.stop();
}
static Future performRequest(io_service& io)
{
try
{
auto addr = ip::tcp::endpoint(ip::address::from_string("8.8.8.8"), 53);
ip::tcp::socket socket(io);
// Works
//socket.open(addr.protocol());
//co_await async_connect(socket, addr);
// Crashes
std::cout << "asioConnect\n";
co_await asioConnect(socket, addr);
assert(socket.is_open());
}
catch (const boost::system::system_error& e)
{
std::cerr << "Failed to parse http response: " << e.what() << "\n";
}
}
int main(int, char**)
{
io_service io;
auto task = performRequest(io);
Future fut;
io.post([&] () {
fut = waitForIt(io, task);
});
io.run();
}
它生成以下输出:
get_return_object:51 [0x7fbbeed03c00]
Future:99 [0x7fbbeed03c00]
initial_suspend:57 [0x7fbbeed03c00]
asioConnect
get_return_object:51 [0x7fbbeed03d50]
Future:99 [0x7fbbeed03d50]
initial_suspend:57 [0x7fbbeed03d50]
Connect initiated
await_suspend:129 [0x7fbbeed03d50] waiter [0x7fbbeed03c00]
get_return_object:51 [0x7fbbeec028a0]
Future:99 [0x7fbbeec028a0]
initial_suspend:57 [0x7fbbeec028a0]
await_suspend:129 [0x7fbbeed03c00] waiter [0x7fbbeec028a0]
Connect ready: resume 0x7fbbeed03d50
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03d50] resume waiter[0x7fbbeed03c00]
await_resume:135 [0x7fbbeed03d50]
~Future:113 [0x7fbbeed03d50]
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03c00] resume waiter[0x7fbbeec028a0]
await_resume:135 [0x7fbbeed03c00]
return_void:69 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeed03c00]
final_suspend:63 [0x7fbbeed03d50]
~Future:113 [0x7fbbeec028a0]
~Future:113 [0x7fbbeed03c00]
corotest(74949,0x7fffc163c3c0) malloc: *** error for object 0x7fbbeed03d50: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
Abort trap: 6
如您所见,asioConnect 函数中的 RaiiCheck 对象被销毁了两次。
asioConnect 函数内的 Future 对象具有地址为 0x7fbbeed03d50 的 coroutine_handle。当在 s.async_connect 完成回调中恢复此协程时,RaiiCheck 对象将首次被销毁。然后我看到 return_void 函数被调用,它恢复了父协程,它似乎再次恢复了内部协程,导致 RaiiCheck 对象的另一个破坏。
这似乎与我的协程嵌套有关,因为如果我 co_await 直接在 async_connect 调用上而不是将 async_connect 调用放在另一个协程中,它就可以工作。
非常感谢任何解决此问题的提示。
你应该只在 final_suspend()
之后恢复服务员,而不是在 return_void()
之后,因为协程框架在 final_suspend()
之前没有展开,你将无法调用 destroy()
] 在它最终暂停之前。
final_suspend()
的更正代码如下所示:
auto final_suspend()
{
struct awaiter
{
std::experimental::coroutine_handle<> _waiter;
bool await_ready() { return false; }
void await_suspend(std::experimental::coroutine_handle<> coro)
{
if (_waiter)
{
_waiter();
}
}
void await_resume() {}
};
return awaiter{_waiter};
}
我正在试验 clang-5 及其协程 TS 实现。 我正在尝试将它与 boost asio 一起使用,但 运行 出现问题,我的协程堆栈框架似乎被破坏了两次,但我无法弄清楚我做错了什么。
这是我简化的再现场景:
#include <experimental/coroutine>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
inline auto async_connect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
struct Awaiter
{
ip::tcp::socket& s;
ip::tcp::endpoint ep;
boost::system::error_code ec;
bool ready = false;
Awaiter(ip::tcp::socket& s, const ip::tcp::endpoint& ep) : s(s) , ep(ep) {}
bool await_ready() { return ready; }
void await_resume()
{
if (ec)
{
throw boost::system::system_error(ec);
}
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
s.async_connect(this->ep, [this, coro] (auto ec) mutable {
this->ready = true;
this->ec = ec;
std::cout << "Connect ready: resume " << coro.address() << "\n";
coro.resume();
});
std::cout << "Connect initiated\n";
}
};
return Awaiter(s, ep);
}
struct Future
{
struct promise_type
{
bool _ready = false;
std::experimental::coroutine_handle<> _waiter = nullptr;
Future get_return_object()
{
auto coro = std::experimental::coroutine_handle<promise_type>::from_promise(*this);
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << coro.address() << "]\n";
return Future(coro);
}
auto initial_suspend()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
return std::experimental::suspend_never();
}
auto final_suspend()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
return std::experimental::suspend_always();
}
void return_void()
{
_ready = true;
if (_waiter)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "] resume waiter[" << _waiter.address() << "]\n ";
_waiter.resume();
}
}
void unhandled_exception()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
std::terminate();
}
};
Future() = default;
Future(const Future&) = delete;
Future& operator=(const Future&) = delete;
Future& operator=(Future&& other)
{
_coro = other._coro;
other._coro = nullptr;
return *this;
}
explicit Future(std::experimental::coroutine_handle<promise_type> coro)
: _coro(coro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
}
Future(Future&& f)
: _coro(f._coro)
{
f._coro = nullptr;
}
~Future()
{
if (_coro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
_coro.destroy();
//_coro = nullptr;
}
}
bool await_ready()
{
assert(_coro);
std::cout << __FUNCTION__ << ":" << __LINE__ << ": ready " << _coro.promise()._ready << std::endl;
return _coro.promise()._ready;
}
void await_suspend(std::experimental::coroutine_handle<> callerCoro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "] waiter [" << callerCoro.address() << "]\n";
_coro.promise()._waiter = callerCoro;
}
void await_resume()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
}
void* address()
{
return _coro.address();
}
std::experimental::coroutine_handle<promise_type> _coro = nullptr;
};
struct RaiiCheck
{
~RaiiCheck()
{
std::cout << "******** ~RaiiCheck " << this << " ********\n";
}
};
Future asioConnect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
RaiiCheck rc;
s.open(ep.protocol());
co_await async_connect(s, ep);
}
template <typename Awaitable>
Future waitForIt(io_service& io, Awaitable&& awaitable)
{
co_await awaitable;
io.stop();
}
static Future performRequest(io_service& io)
{
try
{
auto addr = ip::tcp::endpoint(ip::address::from_string("8.8.8.8"), 53);
ip::tcp::socket socket(io);
// Works
//socket.open(addr.protocol());
//co_await async_connect(socket, addr);
// Crashes
std::cout << "asioConnect\n";
co_await asioConnect(socket, addr);
assert(socket.is_open());
}
catch (const boost::system::system_error& e)
{
std::cerr << "Failed to parse http response: " << e.what() << "\n";
}
}
int main(int, char**)
{
io_service io;
auto task = performRequest(io);
Future fut;
io.post([&] () {
fut = waitForIt(io, task);
});
io.run();
}
它生成以下输出:
get_return_object:51 [0x7fbbeed03c00]
Future:99 [0x7fbbeed03c00]
initial_suspend:57 [0x7fbbeed03c00]
asioConnect
get_return_object:51 [0x7fbbeed03d50]
Future:99 [0x7fbbeed03d50]
initial_suspend:57 [0x7fbbeed03d50]
Connect initiated
await_suspend:129 [0x7fbbeed03d50] waiter [0x7fbbeed03c00]
get_return_object:51 [0x7fbbeec028a0]
Future:99 [0x7fbbeec028a0]
initial_suspend:57 [0x7fbbeec028a0]
await_suspend:129 [0x7fbbeed03c00] waiter [0x7fbbeec028a0]
Connect ready: resume 0x7fbbeed03d50
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03d50] resume waiter[0x7fbbeed03c00]
await_resume:135 [0x7fbbeed03d50]
~Future:113 [0x7fbbeed03d50]
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03c00] resume waiter[0x7fbbeec028a0]
await_resume:135 [0x7fbbeed03c00]
return_void:69 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeed03c00]
final_suspend:63 [0x7fbbeed03d50]
~Future:113 [0x7fbbeec028a0]
~Future:113 [0x7fbbeed03c00]
corotest(74949,0x7fffc163c3c0) malloc: *** error for object 0x7fbbeed03d50: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
Abort trap: 6
如您所见,asioConnect 函数中的 RaiiCheck 对象被销毁了两次。
asioConnect 函数内的 Future 对象具有地址为 0x7fbbeed03d50 的 coroutine_handle。当在 s.async_connect 完成回调中恢复此协程时,RaiiCheck 对象将首次被销毁。然后我看到 return_void 函数被调用,它恢复了父协程,它似乎再次恢复了内部协程,导致 RaiiCheck 对象的另一个破坏。
这似乎与我的协程嵌套有关,因为如果我 co_await 直接在 async_connect 调用上而不是将 async_connect 调用放在另一个协程中,它就可以工作。
非常感谢任何解决此问题的提示。
你应该只在 final_suspend()
之后恢复服务员,而不是在 return_void()
之后,因为协程框架在 final_suspend()
之前没有展开,你将无法调用 destroy()
] 在它最终暂停之前。
final_suspend()
的更正代码如下所示:
auto final_suspend()
{
struct awaiter
{
std::experimental::coroutine_handle<> _waiter;
bool await_ready() { return false; }
void await_suspend(std::experimental::coroutine_handle<> coro)
{
if (_waiter)
{
_waiter();
}
}
void await_resume() {}
};
return awaiter{_waiter};
}