如何在 Rust 中使用 StreamExt::scan 方法改变异步块内的状态?

How can I mutate state inside async block with StreamExt::scan method in Rust?

我正在尝试使用 StreamExt 中的扫描方法。如果我没有异步块,它会完美运行。

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0, |s, i| {
            *s += i;
            futures::future::ready(Some(*s))
        })
        .for_each(|x| async move {
            println!("{:?}", x);
        })
        .await;
}

但是如果我有 async 块,它不会编译。

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0, |s, i| async move {
            *s += i;
            Some(*s)
        })
        .for_each(|x| async move {
            println!("{:?}", x);
        })
        .await;
}

错误是:

error: borrowed data cannot be stored outside of its closure
  --> src/main.rs:6:36
   |
5  |  /     stream::iter(1..10)
6  |  |         .scan(0, |s, i| async move {
   |  |__________________------____________^
   | ||                  |
   | ||                  ...because it cannot outlive this closure
7  | ||             *s += i;
8  | ||             Some(*s)
9  | ||         })
   | ||_________^ cannot be stored outside of its closure
...   |
12 |  |         })
13 |  |         .await;
   |  |______________- borrowed data cannot be stored into here...

如何解决这个问题并在 async 块内改变状态?

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0, |s, i| {       // s: &mut i32, i: i32
            *s += i;
            let st = *s;        // Dereference `s`, and move the value of `s` to `st`, but the type of
                                // `*s` is i32 which impl Copy trait, so here Copy `*s` value to
                                // the owned `st`.
                                // 
                                // Note: this works only if `s` impl the Copy trait

            async move {        // move the owned i32 st into the async block (copy again)
                                // and also move i into the async block which is copyed too
                Some(st)
            }
        })
        .for_each(|x| {
            println!("{:?}", x);
            async {}
        })
        .await;
}

如果 s 不包含 Copy 特征,您必须手动 copy/clone 手动:

use futures::{stream, StreamExt};

#[derive(Debug, Clone)]
struct TT {
    s: i32,
}

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(TT{s: 0}, |s, i|  {
            s.s += i;
            // let mut st = *s;      // move occurs because `*s` has type `TT`, which does not implement the `Copy` trait
            let st = s.clone();      // clone/copy manually
            async move {
                Some(st)
            }
        })
        .for_each(|x| {
            println!("{:?}", x);
            async {}
        })
        .await;
}

如果你想改变异步块中的s,你必须通过智能指针拥有它,比如Rc,或Arc。并且还需要一种方法来修改智能指针包裹的值,如MutexRwLockCell:

use futures::{stream, StreamExt, lock::Mutex};
use std::sync::Arc;  // Arc or Rc

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(Arc::new(Mutex::new(0)), |s, i| { // s: &mut Arc<Mutex<i32>>, i: i32
            let s = Arc::clone(s);              // Clone to owned the Arc

            async move {        // move the owned `s` into the async block
                                // and also move i into the async block which is copyed too
                let mut guard = s.lock().await;
                *guard += i;
                Some(*guard)
            }
        })
        .for_each(|x| {
            println!("{:?}", x);
            async {}
        })
        .await;
}

您不能跨 async 边界共享引用。一旦代码在 async 上下文中执行,就无法再跟踪生命周期,因为编译器不知道未来何时完成。

一种解决方案是使用引用计数智能指针和内部可变性:

use futures::{stream, StreamExt};
use std::cell::Cell;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(Rc::new(Cell::new(0)), |s, i| {
            let s = s.clone();
            async move {
                s.set(s.get() + i);
                Some(s.get())
            }
        })
        .for_each(|x| async move {
            println!("{:?}", x);
        })
        .await;
}

闭包参数中的 s 是一个 &mut Rc<Cell<i32>>,不能跨越 async 边界移动。但是克隆版本是一个 Rc<Cell<i32>> 可以 移动到那里,因为没有生命周期可以跟踪。