带有 tokio 的异步 Rust 中相互关联的期货集合
A collection of interconnected futures in async Rust w/ tokio
问题陈述
我想在异步 Rust 中实现一个有向非循环计算图框架,即计算“节点”的互连图,每个计算“节点”都从前任节点获取输入并为后继节点产生输出。我计划通过产生一组 Future
来实现这一点,每个计算节点一个,同时允许期货之间的依赖关系。但是,在使用 async
实现此框架时,我已经无可救药地迷失在编译器错误中。
最小示例
这是我想做的最简单的例子。有一个浮点数输入列表 values
,任务是创建一个新列表 output
,其中 output[i] = values[i] + output[i - 2]
。这是我试过的:
use std::sync;
fn some_complicated_expensive_fn(val1: f32, val2: f32) -> f32 {
val1 + val2
}
fn example_async(values: &Vec<f32>) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let join_handles = sync::Arc::new(sync::Mutex::new(Vec::<tokio::task::JoinHandle<f32>>::new()));
for (i, value) in values.iter().enumerate() {
let future = {
let join_handles = join_handles.clone();
async move {
if i < 2 {
*value
} else {
let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
some_complicated_expensive_fn(*value, prev_value)
}
}
};
join_handles.lock().unwrap().push(runtime.spawn(future));
}
join_handles
.lock()
.unwrap()
.iter_mut()
.map(|join_handle| runtime.block_on(join_handle).unwrap())
.collect()
}
#[cfg(test)]
mod tests {
#[test]
fn test_example() {
let values = vec![1., 2., 3., 4., 5., 6.];
println!("{:?}", super::example_async(&values));
}
}
我收到关于解锁的 Mutex
不是 Send
的错误:
error: future cannot be sent between threads safely
--> sim/src/compsim/runtime.rs:23:51
|
23 | join_handles.lock().unwrap().push(runtime.spawn(future));
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>`
note: future is not `Send` as this value is used across an await
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `join_handles.lock().unwrap()` maybe used later...
note: `join_handles.lock().unwrap()` is later dropped here
--> sim/src/compsim/runtime.rs:18:88
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ---------------------------- ^
| |
| has type `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
这是有道理的,我在 Tokio docs 中看到您可以改用 tokio::task::Mutex
,但是 a) 我不确定如何,b) 我想知道是否有我缺少的更好的整体方法。帮助不胜感激!谢谢。
编译器抱怨您无法在 join_handle
被锁定的情况下越过等待点,这是因为任务可能在 .await
之后被另一个线程拾取,并且锁定必须在同一个线程中锁定和解锁。您可以通过缩短锁定时间来解决此问题,例如通过在等待之前将每个句柄保存在 Option
、taking 中。但是随后您 运行 遇到等待 JoinHandle
消耗 它的问题 - 您收到任务 return 的值,并且您失去句柄,所以你不能 return 它到矢量。 (这是 Rust 值只有一个所有者的结果,所以一旦句柄将值传递给您,它就不再拥有它并且变得无用了。)
句柄基本上就像是生成任务结果的一次性通道。由于您需要将结果放在另一个地方,因此您可以单独创建一个 one-shot channels 的 vector 来保存另一份结果,供需要它们的任务等待。
pub fn example_async(values: &[f32]) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let (txs, rxs): (Vec<_>, Vec<_>) = (0..values.len())
.map(|_| {
let (tx, rx) = tokio::sync::oneshot::channel();
(Mutex::new(Some(tx)), Mutex::new(Some(rx)))
})
.unzip();
let txs = Arc::new(txs);
let rxs = Arc::new(rxs);
let mut join_handles = vec![];
for (i, value) in values.iter().copied().enumerate() {
let txs = Arc::clone(&txs);
let rxs = Arc::clone(&rxs);
let future = async move {
let result = if i < 2 {
value
} else {
let prev_rx = rxs[i - 2].lock().unwrap().take().unwrap();
let prev_value = prev_rx.await.unwrap();
some_complicated_expensive_fn(value, prev_value)
};
let tx = txs[i].lock().unwrap().take().unwrap();
tx.send(result).unwrap(); // here you'd use result.clone() for non-Copy result
result
};
join_handles.push(runtime.spawn(future));
}
join_handles
.into_iter()
.map(|handle| runtime.block_on(handle).unwrap())
.collect()
}
问题陈述
我想在异步 Rust 中实现一个有向非循环计算图框架,即计算“节点”的互连图,每个计算“节点”都从前任节点获取输入并为后继节点产生输出。我计划通过产生一组 Future
来实现这一点,每个计算节点一个,同时允许期货之间的依赖关系。但是,在使用 async
实现此框架时,我已经无可救药地迷失在编译器错误中。
最小示例
这是我想做的最简单的例子。有一个浮点数输入列表 values
,任务是创建一个新列表 output
,其中 output[i] = values[i] + output[i - 2]
。这是我试过的:
use std::sync;
fn some_complicated_expensive_fn(val1: f32, val2: f32) -> f32 {
val1 + val2
}
fn example_async(values: &Vec<f32>) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let join_handles = sync::Arc::new(sync::Mutex::new(Vec::<tokio::task::JoinHandle<f32>>::new()));
for (i, value) in values.iter().enumerate() {
let future = {
let join_handles = join_handles.clone();
async move {
if i < 2 {
*value
} else {
let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
some_complicated_expensive_fn(*value, prev_value)
}
}
};
join_handles.lock().unwrap().push(runtime.spawn(future));
}
join_handles
.lock()
.unwrap()
.iter_mut()
.map(|join_handle| runtime.block_on(join_handle).unwrap())
.collect()
}
#[cfg(test)]
mod tests {
#[test]
fn test_example() {
let values = vec![1., 2., 3., 4., 5., 6.];
println!("{:?}", super::example_async(&values));
}
}
我收到关于解锁的 Mutex
不是 Send
的错误:
error: future cannot be sent between threads safely
--> sim/src/compsim/runtime.rs:23:51
|
23 | join_handles.lock().unwrap().push(runtime.spawn(future));
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>`
note: future is not `Send` as this value is used across an await
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `join_handles.lock().unwrap()` maybe used later...
note: `join_handles.lock().unwrap()` is later dropped here
--> sim/src/compsim/runtime.rs:18:88
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ---------------------------- ^
| |
| has type `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
这是有道理的,我在 Tokio docs 中看到您可以改用 tokio::task::Mutex
,但是 a) 我不确定如何,b) 我想知道是否有我缺少的更好的整体方法。帮助不胜感激!谢谢。
编译器抱怨您无法在 join_handle
被锁定的情况下越过等待点,这是因为任务可能在 .await
之后被另一个线程拾取,并且锁定必须在同一个线程中锁定和解锁。您可以通过缩短锁定时间来解决此问题,例如通过在等待之前将每个句柄保存在 Option
、taking 中。但是随后您 运行 遇到等待 JoinHandle
消耗 它的问题 - 您收到任务 return 的值,并且您失去句柄,所以你不能 return 它到矢量。 (这是 Rust 值只有一个所有者的结果,所以一旦句柄将值传递给您,它就不再拥有它并且变得无用了。)
句柄基本上就像是生成任务结果的一次性通道。由于您需要将结果放在另一个地方,因此您可以单独创建一个 one-shot channels 的 vector 来保存另一份结果,供需要它们的任务等待。
pub fn example_async(values: &[f32]) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let (txs, rxs): (Vec<_>, Vec<_>) = (0..values.len())
.map(|_| {
let (tx, rx) = tokio::sync::oneshot::channel();
(Mutex::new(Some(tx)), Mutex::new(Some(rx)))
})
.unzip();
let txs = Arc::new(txs);
let rxs = Arc::new(rxs);
let mut join_handles = vec![];
for (i, value) in values.iter().copied().enumerate() {
let txs = Arc::clone(&txs);
let rxs = Arc::clone(&rxs);
let future = async move {
let result = if i < 2 {
value
} else {
let prev_rx = rxs[i - 2].lock().unwrap().take().unwrap();
let prev_value = prev_rx.await.unwrap();
some_complicated_expensive_fn(value, prev_value)
};
let tx = txs[i].lock().unwrap().take().unwrap();
tx.send(result).unwrap(); // here you'd use result.clone() for non-Copy result
result
};
join_handles.push(runtime.spawn(future));
}
join_handles
.into_iter()
.map(|handle| runtime.block_on(handle).unwrap())
.collect()
}