如何在从 Tokio 运行时上下文中的异步方法调用的非异步方法中等待未来?

How do I await a future inside a non-async method which was called from an async method within the context of a Tokio runtime?

我正在使用 Tokio 1.1 来处理异步操作。我有一个 async main#[tokio::main],所以我已经在使用运行时了。

main 调用一个非异步方法,我希望在未来成为 await(具体来说,我正在从数据融合数据帧中收集数据)。这个非异步方法有一个由 returns 结构而不是 Future<Struct> 特征规定的签名。据我所知,我无法将其标记为异步。

如果我尝试调用 df.collect().await;,我会得到

only allowed inside async functions and blocks

编译器错误,指出我在其中调用 await 的方法不是 async

如果我尝试 block_on 像这样的新运行时的未来:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

我遇到运行时恐慌:

Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌:

'not currently running on a Tokio 0.2.x runtime.'

这很奇怪,因为我使用的是 Tokio v1.1。

这感觉比它应该的要难。我在异步上下文中,感觉编译器应该知道这一点并允许我从方法内调用 .await - 唯一的代码路径从 async 块内调用此方法。有没有我缺少的简单方法?

I'm within an async context, and it feels like the compiler should know that and allow me to call .await from within the method

无论您是否在 运行 时间的上下文中,在同步函数内部 await 基本上是不可能的。 await 被转换为屈服点,async 函数被转换为利用这些屈服点执行异步计算的状态机。如果不将您的函数标记为 async,则此转换是不可能的。

如果我没有正确理解你的问题,你有以下代码:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

问题是您无法从 bar 中等待 df.collect,因为它未标记为 async。如果您可以修改 Trait 的签名,那么您可以使用 .

中提到的解决方法使 Trait::bar 成为异步方法

如果您无法更改 Trait 的签名,那么您就有问题了。异步函数应该 永远不会 花费很长时间而不达到 .await。如 中所述,您可以在转换为非异步代码时使用 spawn_blocking

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

现在您需要一种无需等待即可 运行 df.collect 完成的方法。您提到您试图创建一个嵌套的 运行time 来解决这个问题:

If I try and block_on the future from a new runtime ... I get a panic

但是,tokio 不允许您创建嵌套 运行 次。您可以创建一个新的独立运行时间,如中所述。但是,生成嵌套的 运行time 效率很低。

您可以获得 当前 运行时间的句柄:

,而不是生成新的 运行时间
let handle = Handle::current();

输入运行时间背景:

handle.enter();

然后 运行 未来完成 futures::executor::block_on:

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}

进入 tokio 运行时间上下文将解决您之前遇到的错误:

If I try futures::executor::block_on(df.collect()).unwrap();, I get a new runtime panic not currently running on a Tokio 0.2.x runtime

我强烈建议您尽量避免这样做。最佳解决方案是将 Trait::bar 标记为 async 并将 await 标记为正常。任何其他解决方案,包括上面提到的那些,都涉及阻塞当前线程,直到给定的未来完成。

感谢@AliceRyhl 的解释