tokio 在 Rust 中加入多个任务

tokio join multiple tasks in rust

假设一些 futures 存储在一个 Vec 中,其长度由运行时决定,你应该同时加入这些 futures,你应该怎么做?

显然,根据 tokio::join 文档中的示例,手动指定 Vec 可能的每个长度,例如 1、2、3... .

extern crate tokio;

let v = Vec::new();
v.push(future_1);

// directly or indirectly you push many futures to the vector
 
v.push(future_N);

// to join these futures concurrently one possible way is 

if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...

而且我还注意到tokio::join!当我使用

这样的语法时,将列表作为文档中的参数
tokio::join!(v);

或类似

tokio::join![ v ] /  tokio::join![ v[..] ] / tokio::join![ v[..][..] ]

根本行不通

问题来了,是否有任何途径可以更有效地加入这些未来,或者我是否应该遗漏一些与文件所说的不符的内容?

您可以使用 futures::future::join_all 将您的期货集合“合并”成一个单一的期货,当所有子期货都结算时结算。

join_alltry_join_all,以及来自同一个 crate futures 的更通用的 FuturesOrderedFuturesUnordered 实用程序作为单个任务执行.如果组成 futures 并不经常同时准备好执行工作,这可能没问题,但是如果你想利用 CPU 多线程运行时的并行性,请考虑将单个 futures 作为单独的任务生成并等待加入句柄:

use futures::future;

// ...

let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

您还可以使用 FuturesOrderedFuturesUnordered 组合器在流中异步处理输出:

use futures::stream::FuturesUnordered;
use futures::prelude::*;

// ...

let mut completion_stream = v.into_iter()
    .map(tokio::spawn)
    .collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
    // ...    
}

使用任务的一个警告是,当产生任务并可能拥有返回的 JoinHandle 的未来(例如异步块)被删除时,它们不会被取消。需要使用JoinHandle::abort方法显式取消任务。