使用 Tokio 生成非静态未来

Spawn non-static future with Tokio

我有一个异步方法应该并行执行一些 futures,并且只有 return 在所有 futures 完成之后。但是,它通过引用传递了一些数据,这些数据的寿命不会与 'static 一样长(它将在 main 方法的某个时刻被删除)。从概念上讲,它类似于此 (Playground):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

现在,tokio 希望传递给 spawn 的期货在 'static 生命周期内有效,因为我可以在 future 不停止的情况下放下手柄。这意味着我上面的示例会产生此错误消息:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

所以我的问题是:如何生成仅对当前上下文有效的期货,然后我可以等到所有期货都完成?

不可能从异步 Rust 产生非'static 未来。这是因为任何异步函数都可能随时被取消,所以无法保证调用者真的比生成的任务活得更久。

确实有各种 crate 允许在范围内生成异步任务,但是这些 crate 不能从异步代码中使用。他们做的允许从非异步代码中产生作用域异步任务。这并不违反上述问题,因为生成它们的非异步代码不能随时取消,因为它不是异步的。

通常有两种方法:

  1. 使用 Arc 而不是普通引用生成 'static 任务。
  2. 使用 futures crate 中的并发原语而不是生成。

通常要生成静态任务并使用 Arc,您必须拥有相关值的所有权。这意味着由于您的函数通过引用获取参数,因此您不能在不克隆数据的情况下使用此技术。

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

请注意,如果您有对数据的可变引用,并且数据是 Sized,即不是切片,则可以暂时取得它的所有权。

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

另一种选择是使用 futures 包中的并发原语。这些具有处理非 'static 数据的优点,但缺点是任务将无法同时在多个线程上 运行。

对于许多工作流程来说,这非常好,因为异步代码无论如何都应该花费大部分时间等待 IO。

一种方法是使用 FuturesUnordered。这是一个特殊的集合,可以存储许多不同的未来,它有一个 next 功能,可以同时 运行 所有这些,一旦第一个完成就 returns 。 (next功能只有在导入StreamExt时才有效)

你可以这样使用它:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

注意: FuturesUnordered 必须定义 共享值之后。否则你会得到一个借用错误,这是由于它们以错误的顺序被丢弃而导致的。


另一种方法是使用 Stream。对于流,您可以使用 buffer_unordered。这是一个在内部使用 FuturesUnordered 的实用程序。

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

请注意,在这两种情况下,导入 StreamExt 都很重要,因为它提供了各种方法,如果不导入扩展特征,这些方法在流中不可用。

如果代码使用线程进行并行处理,则可以通过 extending a lifetime with transmute 避免复制。一个例子:

fn main() {
    let now = std::time::Instant::now();
    let string = format!("{now:?}");
    println!(
        "{now:?} has length {}",
        parallel_len(&[&string, &string]) / 2
    );
}

fn parallel_len(input: &[&str]) -> usize {
    // SAFETY: this variable needs to be static, because it is passed into a thread,
    // but the thread does not live longer than this function, because we wait for
    // it to finish by calling `join` on it.
    let input: &[&'static str] = unsafe { std::mem::transmute(input) };
    let mut threads = vec![];
    for txt in input {
        threads.push(std::thread::spawn(|| txt.len()));
    }
    threads.into_iter().map(|t| t.join().unwrap()).sum()
}

这似乎也适用于异步代码,这似乎是合理的,但我对此了解不多,无法确定。