带有 tokio 的异步 Rust 中相互关联的期货集合

A collection of interconnected futures in async Rust w/ tokio

问题陈述

我想在异步 Rust 中实现一个有向非循环计算图框架,即计算“节点”的互连图,每个计算“节点”都从前任节点获取输入并为后继节点产生输出。我计划通过产生一组 Future 来实现这一点,每个计算节点一个,同时允许期货之间的依赖关系。但是,在使用 async 实现此框架时,我已经无可救药地迷失在编译器错误中。

最小示例

这是我想做的最简单的例子。有一个浮点数输入列表 values,任务是创建一个新列表 output,其中 output[i] = values[i] + output[i - 2]。这是我试过的:

use std::sync;

fn some_complicated_expensive_fn(val1: f32, val2: f32) -> f32 {
    val1 + val2
}

fn example_async(values: &Vec<f32>) -> Vec<f32> {
    let runtime = tokio::runtime::Runtime::new().unwrap();

    let join_handles = sync::Arc::new(sync::Mutex::new(Vec::<tokio::task::JoinHandle<f32>>::new()));
    for (i, value) in values.iter().enumerate() {
        let future = {
            let join_handles = join_handles.clone();
            async move {
                if i < 2 {
                    *value
                } else {
                    let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
                    some_complicated_expensive_fn(*value, prev_value)
                }
            }
        };
        join_handles.lock().unwrap().push(runtime.spawn(future));
    }
    join_handles
        .lock()
        .unwrap()
        .iter_mut()
        .map(|join_handle| runtime.block_on(join_handle).unwrap())
        .collect()
}

#[cfg(test)]
mod tests {
    #[test]
    fn test_example() {
        let values = vec![1., 2., 3., 4., 5., 6.];
        println!("{:?}", super::example_async(&values));
    }
}

我收到关于解锁的 Mutex 不是 Send 的错误:

error: future cannot be sent between threads safely
  --> sim/src/compsim/runtime.rs:23:51
   |
23 |         join_handles.lock().unwrap().push(runtime.spawn(future));
   |                                                   ^^^^^ future created by async block is not `Send`
   |
   = help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>`
note: future is not `Send` as this value is used across an await
  --> sim/src/compsim/runtime.rs:18:38
   |
18 |                     let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
   |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `join_handles.lock().unwrap()` maybe used later...
note: `join_handles.lock().unwrap()` is later dropped here
  --> sim/src/compsim/runtime.rs:18:88
   |
18 |                     let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
   |                                      ----------------------------                      ^
   |                                      |
   |                                      has type `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
  --> sim/src/compsim/runtime.rs:18:38
   |
18 |                     let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
   |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

这是有道理的,我在 Tokio docs 中看到您可以改用 tokio::task::Mutex,但是 a) 我不确定如何,b) 我想知道是否有我缺少的更好的整体方法。帮助不胜感激!谢谢。

编译器抱怨您无法在 join_handle 被锁定的情况下越过等待点,这是因为任务可能在 .await 之后被另一个线程拾取,并且锁定必须在同一个线程中锁定和解锁。您可以通过缩短锁定时间来解决此问题,例如通过在等待之前将每个句柄保存在 Optiontaking 中。但是随后您 运行 遇到等待 JoinHandle 消耗 它的问题 - 您收到任务 return 的值,并且您失去句柄,所以你不能 return 它到矢量。 (这是 Rust 值只有一个所有者的结果,所以一旦句柄将值传递给您,它就不再拥有它并且变得无用了。)

句柄基本上就像是生成任务结果的一次性通道。由于您需要将结果放在另一个地方,因此您可以单独创建一个 one-shot channels 的 vector 来保存另一份结果,供需要它们的任务等待。

pub fn example_async(values: &[f32]) -> Vec<f32> {
    let runtime = tokio::runtime::Runtime::new().unwrap();

    let (txs, rxs): (Vec<_>, Vec<_>) = (0..values.len())
        .map(|_| {
            let (tx, rx) = tokio::sync::oneshot::channel();
            (Mutex::new(Some(tx)), Mutex::new(Some(rx)))
        })
        .unzip();
    let txs = Arc::new(txs);
    let rxs = Arc::new(rxs);

    let mut join_handles = vec![];
    for (i, value) in values.iter().copied().enumerate() {
        let txs = Arc::clone(&txs);
        let rxs = Arc::clone(&rxs);
        let future = async move {
            let result = if i < 2 {
                value
            } else {
                let prev_rx = rxs[i - 2].lock().unwrap().take().unwrap();
                let prev_value = prev_rx.await.unwrap();
                some_complicated_expensive_fn(value, prev_value)
            };
            let tx = txs[i].lock().unwrap().take().unwrap();
            tx.send(result).unwrap(); // here you'd use result.clone() for non-Copy result
            result
        };
        join_handles.push(runtime.spawn(future));
    }
    join_handles
        .into_iter()
        .map(|handle| runtime.block_on(handle).unwrap())
        .collect()
}

Playground