组合协程

Composed coroutines

我正在学习 C++ 20 协程。还是有些不明白,不过我看了几个关于这个话题的视频。

我正在尝试获得一个可组合的协程。我已经了解了对称传输来执行此操作,这是我的代码(请原谅长示例):

#include <coroutine>
#include <iostream>


// forward declaration
template <class T> struct promise;

// simple wrapper to follow the workflow
struct MySuspendAlways
{                                  // (1)
    bool await_ready() const noexcept {
        std::cout << "\tMySuspendAlways::await_ready" << '\n';
        return false;
    }

    bool await_suspend(std::coroutine_handle<> handle) const noexcept {
        std::cout << "\tMySuspendAlways::await_suspend coro handle "  << handle.address() << '\n';
        return true;
    }

    void await_resume() const noexcept {
        std::cout << "\tMySuspendAlways::await_resume" << '\n';
    }
};


template < typename T>
struct SimpleAwaitable
{
    std::coroutine_handle<promise<T>> coro_;

    SimpleAwaitable(std::coroutine_handle<promise<T>> coro) :
        coro_{ coro }
    {
        std::cout << "\n\tSimpleAwaitable::coro handle = " << coro_.address();
    }

    bool await_ready() 
    { 
        std::cout << "\n\tSimpleAwaitable::await_ready coro handle " << coro_.address();
        if (!coro_.done())
        {
            return false;
        }

        return true; 
    }

    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuationCoro)
    {
        std::cout << "\n\tSimpleAwaitable::await_suspend coro handle " << coro_.address();
        coro_.promise().set_continuation(continuationCoro);
        return coro_;
    }

    decltype(auto) await_resume()
    {
        std::cout << "\n\tSimpleAwaitable::await_resume coro handle " << coro_.address();
        return coro_.promise().value_;
    }
};



struct final_awaitable
{
    bool await_ready() noexcept { return false; };

    void await_resume() noexcept {}
    
    template < typename P>
    std::coroutine_handle<> await_suspend(std::coroutine_handle<P> finalizedCoro) noexcept
    {
        auto& continuation = finalizedCoro.promise().continuation_;
        return continuation ? continuation : std::noop_coroutine();
    }
};


// promise type
template <typename T>
struct promise
{
    T value_;
    std::coroutine_handle<> continuation_;

    auto get_return_object() noexcept
    {
        return std::coroutine_handle<promise>::from_promise(*this);
    }

    MySuspendAlways initial_suspend() noexcept
    {     
        std::cout << "\t|_Job prepared" << '\n';
        return {};
    }

    final_awaitable final_suspend() noexcept {  
        std::cout << "\t|_Job finished" << '\n';
        return {};
    }

    void unhandled_exception() noexcept
    {
        std::terminate();
    }

    void return_value(T t) noexcept { value_ = std::move(t); }
    
    void set_continuation(std::coroutine_handle<> continuation)  noexcept
    {
        std::cout << "\tset chain with coro hadle = " << continuation.address() << '\n';
        continuation_ = continuation;
    }

    auto continuation() noexcept
    {
        return continuation_;
    }
};


template < typename T>
class task_ 
{
public:
    
    using promise_type = promise<T>;

    task_(std::coroutine_handle<promise_type> handle) :
        handle_(handle)
        {}

    ~task_() 
    {
        if (handle_)
        {
            std::cout << "\n\t\tDestoy " << handle_.address();
            handle_.destroy();
        }
    }

    T value() noexcept { return handle_.promise().value_; }

    void resume() noexcept
    { 
        if (!handle_.done())
        {
            handle_.resume();
        }
    }
    
    bool is_ready() noexcept { return handle_.done(); }

    auto operator co_await() const noexcept
    {
        std::cout << "\n\tCO_AWAIT\n";
        return SimpleAwaitable<T>{handle_};
    }

 private:
     std::coroutine_handle<promise<T>> handle_;
};



task_<std::string> suspending_2()
{
    std::cout << "\n[, ]";
    co_await std::suspend_always{};
    std::cout << "\n[WORLD]";
    co_return "done";
}

task_<std::string> composed_2()
{
    std::cout << "\n[HELLO]";
    co_await suspending_2();
    std::cout << "\n[!!]";
    co_return "end";
}




int main() {
     task_<std::string> x = composed_2();
     while (!x.is_ready())
     {
         std::cout << "\nresuming...";
         x.resume();
     }
     return 0;
}

我预计这个流程:

HELLO -> ,_ -> WORLD -> !!

但我得到的是:

HELLO -> ,_ , -> !!

不过我仔细看了final awaiter,continuation handle,await_suspend返回的coro handle恢复continuation,没用

有协程高手帮我解释一下哪里错了吗?

谢谢

您永远不会恢复协程 suspending_2。这就是为什么你的协程管理会发生内存泄漏的原因。

当您 co_await suspending_2 时,您使用 composed_2 句柄设置 promise::continuation_

基本上,你写的是,当你暂停composed_2时,获取suspending_2的句柄,并在suspending_2完成时恢复它。那是没有意义的,因为它永远不会完成。您必须做的是在恢复 composed_2 之前恢复 suspending_2。基本上恢复composed_2就必须恢复suspending_2.

为此,您可以将 suspending_2 的句柄保存在 composed_2 的承诺中,并在必要时恢复它。

这是我根据您的代码修改的函数:

您的简单 Awaitable 和最终 awaitable。

template <typename T> struct SimpleAwaitable {
  std::coroutine_handle<promise<T>> coro_;

  SimpleAwaitable(std::coroutine_handle<promise<T>> coro) : coro_{coro} {
    std::cout << "\n\tSimpleAwaitable::coro handle = " << coro_.address();
  }

  bool await_ready() {
    std::cout << "\n\tSimpleAwaitable::await_ready coro handle "
              << coro_.address();
    if (!coro_.done()) {
      return false;
    }

    return true;
  }

  void await_suspend(std::coroutine_handle<promise<T>> continuationCoro) {
    std::cout << "\n\tSimpleAwaitable::await_suspend coro handle "
              << coro_.address();
    continuationCoro.promise().set_continuation(coro_);
  }

  decltype(auto) await_resume() {
    std::cout << "\n\tSimpleAwaitable::await_resume coro handle "
              << coro_.address();
    return coro_.promise().value_;
  }
};

struct final_awaitable {
  bool await_ready() noexcept { return false; };

  void await_resume() noexcept {}

  void await_suspend(std::coroutine_handle<> finalizedCoro) noexcept {}
};

还有你的简历功能

  void resume() noexcept {
    std::cout << "resume!!:"
              << " " << handle_.address() << std::endl;
    if (handle_.promise().continuation_) {
      while (!handle_.promise().continuation_.done()) {
        std::cout << "resume!!:"
                  << " " << handle_.promise().continuation_.address()
                  << std::endl;
        handle_.promise().continuation_.resume();
      }
    }
    if (!handle_.done()) {
      handle_.resume();
    }
  }

不过,还是有内存泄漏,让你自己处理。 这不是直截了当的,但我鼓励你在做一些像这样复杂的事情之前尝试更好地理解协程是如何工作的:)。

我也奉劝大家不要那样管理std::coroutine_handle,封装成一个任务,不要交给其他任务(像现在这样)。只需将任务传递给其他任务即可形成一个延续链:).

如果您还有其他问题,请不要犹豫。