async-stream + 无论如何都不是 Send

async-stream + anyhow is not Send

我有一个函数 returns Stream<Item=Result<..>>.

use futures::StreamExt;
use parking_lot::Mutex;
use std::sync::Arc;

fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
    async_stream::try_stream! {
        let a = Arc::new(Mutex::new(()));
        for item in &[1,2,3] {
            {
                let mut _guard = a.try_lock().expect("aa");
                Err(anyhow::anyhow!("asdf"))?;
            }
            {
                yield *item;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let _s = get_stream().boxed();
}

应该立即删除互斥保护 _guard,但 rustc 抱怨 get_stream 由于 _guard 而未发送。

error: future cannot be sent between threads safely
  --> src/main.rs:22:27
   |
22 |     let _s = get_stream().boxed();
   |                           ^^^^^ future created by async block is not `Send`
   |
   = help: within `impl futures::Stream`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
  --> src/main.rs:6:5
   |
6  | /     async_stream::try_stream! {
7  | |         let a = Arc::new(Mutex::new(()));
8  | |         for item in &[1,2,3] {
9  | |             {
...  |
16 | |         }
17 | |     }
   | |_____^ first, await occurs here, with `mut _guard` maybe used later...
note: `mut _guard` is later dropped here
  --> src/main.rs:6:5
   |
6  | /     async_stream::try_stream! {
7  | |         let a = Arc::new(Mutex::new(()));
8  | |         for item in &[1,2,3] {
9  | |             {
10 | |                 let mut _guard = a.try_lock().expect("aa");
   | |                     ---------- has type `parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, ()>` which is not `Send`
...  |
16 | |         }
17 | |     }
   | |_____^
   = note: this error originates in the macro `async_stream::try_stream` (in Nightly builds, run with -Z macro-backtrace for more info)

将语句的顺序更改为

Err(anyhow::anyhow!("asdf"))?;
let mut _guard = a.try_lock().expect("aa");            

不会编译出错。为什么是这样? anyhow 可能会生成堆栈跟踪,但它会捕获 anyhow::Error 中的局部变量吗?

这里的问题是代码扩展为包含 .await 点的匹配语句,其中 Err 通过用于实现流的通道发送:

let lock = a.try_lock();
match Err({
    let error = ::anyhow::private::format_err(
        match match () {
            () => [],
        } {
            ref args => unsafe { ::core::fmt::Arguments::new_v1(&["asdf"], args) },
        },
    );
    error
}) {
    ::core::result::Result::Ok(v) => v,
    ::core::result::Result::Err(e) => {
        __yield_tx.send(::core::result::Result::Err(e.into())).await;
        return;
    }
};
// ..rest of the function

锁在包括 await 点在内的整个 match 期间都被视为有效,因此您将获得 !Send 未来。

这个问题的一个更简单的例子是:

fn foo() -> impl Future<Output = ()> + Send {
    async {
        let e: Result<(), ()> = Err(());
        let a = Arc::new(Mutex::new(()));
        let mut lock = a.lock();
        if e.is_err() {
            futures::future::ready(e).await;
            return;
        }
    }
}

解决此问题的最直接方法是您在 yield 在函数末尾对项目进行操作之前已经做的事情:将所有锁定封装在它自己的范围内,而不在其中发出任何流项目范围。

这可以通过函数内的范围界定来完成 - 当您想使用 ? 运算符发出错误时会变得很烦人 - 或者通过同步函数或同步闭包在内部锁定互斥锁并在返回之前将其删除。通过这种方式,您可以通过 ? 对结果进行相当符合人体工程学的错误处理:

fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
    async_stream::try_stream! {
        let a = Arc::new(Mutex::new(()));
        for item in &[1,2,3] {
            let scoped = || {
                let lock = a.try_lock().expect("aa");
                Err(anyhow::anyhow!("asdf"))?;
                Ok::<(), anyhow::Error>(*lock)
            };
            scoped()?;
            yield *item;
        }
    }
}