如何等待 Rust 中的异步函数调用列表?

How to wait for a list of async function calls in rust?

我在 Rust 中有一个 async 函数列表,我想同时执行这些函数,然后等待它们全部完成。我现在的工作代码是

 async fn start_consumers(&self) {
    for consumer in &self.consumers {
        consumer.consume().await;
    }
}

这不太准确,因为函数是串行执行的。我正在寻找类似 join! 的东西,但它适用于动态向量,使用它我应该能够写出类似

的东西
 async fn start_consumers(&self) {
    let mut v = Vec::new();
    for consumer in &self.consumers {
        consumer.consume();
    }
    join!(v);
}

目前join!仅支持元组。我正在为此寻找替代方案。类似于 JavaScript.

中的 Promise.all()

所以经过一番搜索,我发现 rust futures 有一个名为 join_all 的函数,它允许等待一个 futures 集合。

 use futures::future::join_all;
 ....

 async fn start_consumers(&self) {
    let mut v = Vec::new();
    for consumer in &self.consumers {
        v.push(consumer.consume());
    }
    join_all(v).await;
 }

同一天我也问了类似的问题,但在我的情况下,我有一个 Result 包裹在 Future 中。所以我不得不使用 try_join_all

而不是 join_all

如果你想 await/join 来自普通同步函数的所有 Future,并且不关心它们的结果,你可以这样写:

futures::executor::block_on(async{futures::join!(future1, future2, future3, ...)});

您可以使用这个宏 block_all 更符合人体工程学的用法:

macro_rules! block_all {
    ($($future:expr),*) => {{
        futures::executor::block_on(async{futures::join!($($future),*)})   
    }};
}

用法:

block_all!(future1, future2, ...);

join_all/try_join_all 可以解决问题,但输出是一个 Vec,它收集了期货的结果。在上面的修改示例中,合并的 future 产生一个 Vec<()>,它不会导致分配,甚至扩展这个向量的操作也应该在发布版本中优化为无。

即使在您确实需要输出的情况下,也可能值得在它们以流的形式异步出现时对其进行处理,而不是等待所有输出都被收集。为此,您可以使用 FuturesOrdered or FuturesUnordered,具体取决于您是关心在流产生的输出中保留原始期货的顺序,还是更喜欢按完成顺序接收输出。 FuturesUnordered 不需要对结果进行缓冲,并且可能比由相同期货组成的 FuturesOrdered 完成得更快。

The easiest way to do this is to use an mpsc channel where, instead of sending messages, you wait for the channel to be closed, which happens when every sender has been dropped.

查看 tokio 的示例是 here