使用 Tokio 生成非静态未来
Spawn non-static future with Tokio
我有一个异步方法应该并行执行一些 futures,并且只有 return 在所有 futures 完成之后。但是,它通过引用传递了一些数据,这些数据的寿命不会与 'static
一样长(它将在 main 方法的某个时刻被删除)。从概念上讲,它类似于此 (Playground):
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
现在,tokio 希望传递给 spawn
的期货在 'static
生命周期内有效,因为我可以在 future 不停止的情况下放下手柄。这意味着我上面的示例会产生此错误消息:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
| ^^^^^ ------ this data with an anonymous lifetime `'_`...
| |
| ...is captured here...
...
15 | let task = spawn(do_sth(i));
| ----- ...and is required to live as long as `'static` here
所以我的问题是:如何生成仅对当前上下文有效的期货,然后我可以等到所有期货都完成?
不可能从异步 Rust 产生非'static
未来。这是因为任何异步函数都可能随时被取消,所以无法保证调用者真的比生成的任务活得更久。
确实有各种 crate 允许在范围内生成异步任务,但是这些 crate 不能从异步代码中使用。他们做的允许从非异步代码中产生作用域异步任务。这并不违反上述问题,因为生成它们的非异步代码不能随时取消,因为它不是异步的。
通常有两种方法:
- 使用
Arc
而不是普通引用生成 'static
任务。
- 使用 futures crate 中的并发原语而不是生成。
通常要生成静态任务并使用 Arc
,您必须拥有相关值的所有权。这意味着由于您的函数通过引用获取参数,因此您不能在不克隆数据的情况下使用此技术。
async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
请注意,如果您有对数据的可变引用,并且数据是 Sized
,即不是切片,则可以暂时取得它的所有权。
async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}
另一种选择是使用 futures 包中的并发原语。这些具有处理非 'static
数据的优点,但缺点是任务将无法同时在多个线程上 运行。
对于许多工作流程来说,这非常好,因为异步代码无论如何都应该花费大部分时间等待 IO。
一种方法是使用 FuturesUnordered
。这是一个特殊的集合,可以存储许多不同的未来,它有一个 next
功能,可以同时 运行 所有这些,一旦第一个完成就 returns 。 (next
功能只有在导入StreamExt
时才有效)
你可以这样使用它:
use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}
注意: FuturesUnordered
必须定义 在 共享值之后。否则你会得到一个借用错误,这是由于它们以错误的顺序被丢弃而导致的。
另一种方法是使用 Stream
。对于流,您可以使用 buffer_unordered
。这是一个在内部使用 FuturesUnordered
的实用程序。
use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}
请注意,在这两种情况下,导入 StreamExt
都很重要,因为它提供了各种方法,如果不导入扩展特征,这些方法在流中不可用。
如果代码使用线程进行并行处理,则可以通过 extending a lifetime with transmute 避免复制。一个例子:
fn main() {
let now = std::time::Instant::now();
let string = format!("{now:?}");
println!(
"{now:?} has length {}",
parallel_len(&[&string, &string]) / 2
);
}
fn parallel_len(input: &[&str]) -> usize {
// SAFETY: this variable needs to be static, because it is passed into a thread,
// but the thread does not live longer than this function, because we wait for
// it to finish by calling `join` on it.
let input: &[&'static str] = unsafe { std::mem::transmute(input) };
let mut threads = vec![];
for txt in input {
threads.push(std::thread::spawn(|| txt.len()));
}
threads.into_iter().map(|t| t.join().unwrap()).sum()
}
这似乎也适用于异步代码,这似乎是合理的,但我对此了解不多,无法确定。
我有一个异步方法应该并行执行一些 futures,并且只有 return 在所有 futures 完成之后。但是,它通过引用传递了一些数据,这些数据的寿命不会与 'static
一样长(它将在 main 方法的某个时刻被删除)。从概念上讲,它类似于此 (Playground):
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
现在,tokio 希望传递给 spawn
的期货在 'static
生命周期内有效,因为我可以在 future 不停止的情况下放下手柄。这意味着我上面的示例会产生此错误消息:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
| ^^^^^ ------ this data with an anonymous lifetime `'_`...
| |
| ...is captured here...
...
15 | let task = spawn(do_sth(i));
| ----- ...and is required to live as long as `'static` here
所以我的问题是:如何生成仅对当前上下文有效的期货,然后我可以等到所有期货都完成?
不可能从异步 Rust 产生非'static
未来。这是因为任何异步函数都可能随时被取消,所以无法保证调用者真的比生成的任务活得更久。
确实有各种 crate 允许在范围内生成异步任务,但是这些 crate 不能从异步代码中使用。他们做的允许从非异步代码中产生作用域异步任务。这并不违反上述问题,因为生成它们的非异步代码不能随时取消,因为它不是异步的。
通常有两种方法:
- 使用
Arc
而不是普通引用生成'static
任务。 - 使用 futures crate 中的并发原语而不是生成。
通常要生成静态任务并使用 Arc
,您必须拥有相关值的所有权。这意味着由于您的函数通过引用获取参数,因此您不能在不克隆数据的情况下使用此技术。
async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
请注意,如果您有对数据的可变引用,并且数据是 Sized
,即不是切片,则可以暂时取得它的所有权。
async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}
另一种选择是使用 futures 包中的并发原语。这些具有处理非 'static
数据的优点,但缺点是任务将无法同时在多个线程上 运行。
对于许多工作流程来说,这非常好,因为异步代码无论如何都应该花费大部分时间等待 IO。
一种方法是使用 FuturesUnordered
。这是一个特殊的集合,可以存储许多不同的未来,它有一个 next
功能,可以同时 运行 所有这些,一旦第一个完成就 returns 。 (next
功能只有在导入StreamExt
时才有效)
你可以这样使用它:
use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}
注意: FuturesUnordered
必须定义 在 共享值之后。否则你会得到一个借用错误,这是由于它们以错误的顺序被丢弃而导致的。
另一种方法是使用 Stream
。对于流,您可以使用 buffer_unordered
。这是一个在内部使用 FuturesUnordered
的实用程序。
use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}
请注意,在这两种情况下,导入 StreamExt
都很重要,因为它提供了各种方法,如果不导入扩展特征,这些方法在流中不可用。
如果代码使用线程进行并行处理,则可以通过 extending a lifetime with transmute 避免复制。一个例子:
fn main() {
let now = std::time::Instant::now();
let string = format!("{now:?}");
println!(
"{now:?} has length {}",
parallel_len(&[&string, &string]) / 2
);
}
fn parallel_len(input: &[&str]) -> usize {
// SAFETY: this variable needs to be static, because it is passed into a thread,
// but the thread does not live longer than this function, because we wait for
// it to finish by calling `join` on it.
let input: &[&'static str] = unsafe { std::mem::transmute(input) };
let mut threads = vec![];
for txt in input {
threads.push(std::thread::spawn(|| txt.len()));
}
threads.into_iter().map(|t| t.join().unwrap()).sum()
}
这似乎也适用于异步代码,这似乎是合理的,但我对此了解不多,无法确定。