如何在 Rust 中使用 StreamExt::scan 方法改变异步块内的状态?
How can I mutate state inside async block with StreamExt::scan method in Rust?
我正在尝试使用 StreamExt 中的扫描方法。如果我没有异步块,它会完美运行。
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| {
*s += i;
futures::future::ready(Some(*s))
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
但是如果我有 async
块,它不会编译。
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| async move {
*s += i;
Some(*s)
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
错误是:
error: borrowed data cannot be stored outside of its closure
--> src/main.rs:6:36
|
5 | / stream::iter(1..10)
6 | | .scan(0, |s, i| async move {
| |__________________------____________^
| || |
| || ...because it cannot outlive this closure
7 | || *s += i;
8 | || Some(*s)
9 | || })
| ||_________^ cannot be stored outside of its closure
... |
12 | | })
13 | | .await;
| |______________- borrowed data cannot be stored into here...
如何解决这个问题并在 async
块内改变状态?
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| { // s: &mut i32, i: i32
*s += i;
let st = *s; // Dereference `s`, and move the value of `s` to `st`, but the type of
// `*s` is i32 which impl Copy trait, so here Copy `*s` value to
// the owned `st`.
//
// Note: this works only if `s` impl the Copy trait
async move { // move the owned i32 st into the async block (copy again)
// and also move i into the async block which is copyed too
Some(st)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
如果 s
不包含 Copy
特征,您必须手动 copy/clone 手动:
use futures::{stream, StreamExt};
#[derive(Debug, Clone)]
struct TT {
s: i32,
}
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(TT{s: 0}, |s, i| {
s.s += i;
// let mut st = *s; // move occurs because `*s` has type `TT`, which does not implement the `Copy` trait
let st = s.clone(); // clone/copy manually
async move {
Some(st)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
如果你想改变异步块中的s
,你必须通过智能指针拥有它,比如Rc
,或Arc
。并且还需要一种方法来修改智能指针包裹的值,如Mutex
、RwLock
或Cell
:
use futures::{stream, StreamExt, lock::Mutex};
use std::sync::Arc; // Arc or Rc
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Arc::new(Mutex::new(0)), |s, i| { // s: &mut Arc<Mutex<i32>>, i: i32
let s = Arc::clone(s); // Clone to owned the Arc
async move { // move the owned `s` into the async block
// and also move i into the async block which is copyed too
let mut guard = s.lock().await;
*guard += i;
Some(*guard)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
您不能跨 async
边界共享引用。一旦代码在 async
上下文中执行,就无法再跟踪生命周期,因为编译器不知道未来何时完成。
一种解决方案是使用引用计数智能指针和内部可变性:
use futures::{stream, StreamExt};
use std::cell::Cell;
use std::rc::Rc;
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Rc::new(Cell::new(0)), |s, i| {
let s = s.clone();
async move {
s.set(s.get() + i);
Some(s.get())
}
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
闭包参数中的 s
是一个 &mut Rc<Cell<i32>>
,不能跨越 async
边界移动。但是克隆版本是一个 Rc<Cell<i32>>
, 可以 移动到那里,因为没有生命周期可以跟踪。
我正在尝试使用 StreamExt 中的扫描方法。如果我没有异步块,它会完美运行。
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| {
*s += i;
futures::future::ready(Some(*s))
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
但是如果我有 async
块,它不会编译。
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| async move {
*s += i;
Some(*s)
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
错误是:
error: borrowed data cannot be stored outside of its closure
--> src/main.rs:6:36
|
5 | / stream::iter(1..10)
6 | | .scan(0, |s, i| async move {
| |__________________------____________^
| || |
| || ...because it cannot outlive this closure
7 | || *s += i;
8 | || Some(*s)
9 | || })
| ||_________^ cannot be stored outside of its closure
... |
12 | | })
13 | | .await;
| |______________- borrowed data cannot be stored into here...
如何解决这个问题并在 async
块内改变状态?
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| { // s: &mut i32, i: i32
*s += i;
let st = *s; // Dereference `s`, and move the value of `s` to `st`, but the type of
// `*s` is i32 which impl Copy trait, so here Copy `*s` value to
// the owned `st`.
//
// Note: this works only if `s` impl the Copy trait
async move { // move the owned i32 st into the async block (copy again)
// and also move i into the async block which is copyed too
Some(st)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
如果 s
不包含 Copy
特征,您必须手动 copy/clone 手动:
use futures::{stream, StreamExt};
#[derive(Debug, Clone)]
struct TT {
s: i32,
}
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(TT{s: 0}, |s, i| {
s.s += i;
// let mut st = *s; // move occurs because `*s` has type `TT`, which does not implement the `Copy` trait
let st = s.clone(); // clone/copy manually
async move {
Some(st)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
如果你想改变异步块中的s
,你必须通过智能指针拥有它,比如Rc
,或Arc
。并且还需要一种方法来修改智能指针包裹的值,如Mutex
、RwLock
或Cell
:
use futures::{stream, StreamExt, lock::Mutex};
use std::sync::Arc; // Arc or Rc
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Arc::new(Mutex::new(0)), |s, i| { // s: &mut Arc<Mutex<i32>>, i: i32
let s = Arc::clone(s); // Clone to owned the Arc
async move { // move the owned `s` into the async block
// and also move i into the async block which is copyed too
let mut guard = s.lock().await;
*guard += i;
Some(*guard)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
您不能跨 async
边界共享引用。一旦代码在 async
上下文中执行,就无法再跟踪生命周期,因为编译器不知道未来何时完成。
一种解决方案是使用引用计数智能指针和内部可变性:
use futures::{stream, StreamExt};
use std::cell::Cell;
use std::rc::Rc;
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Rc::new(Cell::new(0)), |s, i| {
let s = s.clone();
async move {
s.set(s.get() + i);
Some(s.get())
}
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
闭包参数中的 s
是一个 &mut Rc<Cell<i32>>
,不能跨越 async
边界移动。但是克隆版本是一个 Rc<Cell<i32>>
, 可以 移动到那里,因为没有生命周期可以跟踪。