async-stream + 无论如何都不是 Send
async-stream + anyhow is not Send
我有一个函数 returns Stream<Item=Result<..>>
.
use futures::StreamExt;
use parking_lot::Mutex;
use std::sync::Arc;
fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
async_stream::try_stream! {
let a = Arc::new(Mutex::new(()));
for item in &[1,2,3] {
{
let mut _guard = a.try_lock().expect("aa");
Err(anyhow::anyhow!("asdf"))?;
}
{
yield *item;
}
}
}
}
#[tokio::main]
async fn main() {
let _s = get_stream().boxed();
}
应该立即删除互斥保护 _guard
,但 rustc
抱怨 get_stream
由于 _guard
而未发送。
error: future cannot be sent between threads safely
--> src/main.rs:22:27
|
22 | let _s = get_stream().boxed();
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl futures::Stream`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
--> src/main.rs:6:5
|
6 | / async_stream::try_stream! {
7 | | let a = Arc::new(Mutex::new(()));
8 | | for item in &[1,2,3] {
9 | | {
... |
16 | | }
17 | | }
| |_____^ first, await occurs here, with `mut _guard` maybe used later...
note: `mut _guard` is later dropped here
--> src/main.rs:6:5
|
6 | / async_stream::try_stream! {
7 | | let a = Arc::new(Mutex::new(()));
8 | | for item in &[1,2,3] {
9 | | {
10 | | let mut _guard = a.try_lock().expect("aa");
| | ---------- has type `parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, ()>` which is not `Send`
... |
16 | | }
17 | | }
| |_____^
= note: this error originates in the macro `async_stream::try_stream` (in Nightly builds, run with -Z macro-backtrace for more info)
将语句的顺序更改为
Err(anyhow::anyhow!("asdf"))?;
let mut _guard = a.try_lock().expect("aa");
不会编译出错。为什么是这样? anyhow
可能会生成堆栈跟踪,但它会捕获 anyhow::Error
中的局部变量吗?
这里的问题是代码扩展为包含 .await
点的匹配语句,其中 Err
通过用于实现流的通道发送:
let lock = a.try_lock();
match Err({
let error = ::anyhow::private::format_err(
match match () {
() => [],
} {
ref args => unsafe { ::core::fmt::Arguments::new_v1(&["asdf"], args) },
},
);
error
}) {
::core::result::Result::Ok(v) => v,
::core::result::Result::Err(e) => {
__yield_tx.send(::core::result::Result::Err(e.into())).await;
return;
}
};
// ..rest of the function
锁在包括 await
点在内的整个 match
期间都被视为有效,因此您将获得 !Send
未来。
这个问题的一个更简单的例子是:
fn foo() -> impl Future<Output = ()> + Send {
async {
let e: Result<(), ()> = Err(());
let a = Arc::new(Mutex::new(()));
let mut lock = a.lock();
if e.is_err() {
futures::future::ready(e).await;
return;
}
}
}
解决此问题的最直接方法是您在 yield
在函数末尾对项目进行操作之前已经做的事情:将所有锁定封装在它自己的范围内,而不在其中发出任何流项目范围。
这可以通过函数内的范围界定来完成 - 当您想使用 ?
运算符发出错误时会变得很烦人 - 或者通过同步函数或同步闭包在内部锁定互斥锁并在返回之前将其删除。通过这种方式,您可以通过 ?
对结果进行相当符合人体工程学的错误处理:
fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
async_stream::try_stream! {
let a = Arc::new(Mutex::new(()));
for item in &[1,2,3] {
let scoped = || {
let lock = a.try_lock().expect("aa");
Err(anyhow::anyhow!("asdf"))?;
Ok::<(), anyhow::Error>(*lock)
};
scoped()?;
yield *item;
}
}
}
我有一个函数 returns Stream<Item=Result<..>>
.
use futures::StreamExt;
use parking_lot::Mutex;
use std::sync::Arc;
fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
async_stream::try_stream! {
let a = Arc::new(Mutex::new(()));
for item in &[1,2,3] {
{
let mut _guard = a.try_lock().expect("aa");
Err(anyhow::anyhow!("asdf"))?;
}
{
yield *item;
}
}
}
}
#[tokio::main]
async fn main() {
let _s = get_stream().boxed();
}
应该立即删除互斥保护 _guard
,但 rustc
抱怨 get_stream
由于 _guard
而未发送。
error: future cannot be sent between threads safely
--> src/main.rs:22:27
|
22 | let _s = get_stream().boxed();
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl futures::Stream`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
--> src/main.rs:6:5
|
6 | / async_stream::try_stream! {
7 | | let a = Arc::new(Mutex::new(()));
8 | | for item in &[1,2,3] {
9 | | {
... |
16 | | }
17 | | }
| |_____^ first, await occurs here, with `mut _guard` maybe used later...
note: `mut _guard` is later dropped here
--> src/main.rs:6:5
|
6 | / async_stream::try_stream! {
7 | | let a = Arc::new(Mutex::new(()));
8 | | for item in &[1,2,3] {
9 | | {
10 | | let mut _guard = a.try_lock().expect("aa");
| | ---------- has type `parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, ()>` which is not `Send`
... |
16 | | }
17 | | }
| |_____^
= note: this error originates in the macro `async_stream::try_stream` (in Nightly builds, run with -Z macro-backtrace for more info)
将语句的顺序更改为
Err(anyhow::anyhow!("asdf"))?;
let mut _guard = a.try_lock().expect("aa");
不会编译出错。为什么是这样? anyhow
可能会生成堆栈跟踪,但它会捕获 anyhow::Error
中的局部变量吗?
这里的问题是代码扩展为包含 .await
点的匹配语句,其中 Err
通过用于实现流的通道发送:
let lock = a.try_lock();
match Err({
let error = ::anyhow::private::format_err(
match match () {
() => [],
} {
ref args => unsafe { ::core::fmt::Arguments::new_v1(&["asdf"], args) },
},
);
error
}) {
::core::result::Result::Ok(v) => v,
::core::result::Result::Err(e) => {
__yield_tx.send(::core::result::Result::Err(e.into())).await;
return;
}
};
// ..rest of the function
锁在包括 await
点在内的整个 match
期间都被视为有效,因此您将获得 !Send
未来。
这个问题的一个更简单的例子是:
fn foo() -> impl Future<Output = ()> + Send {
async {
let e: Result<(), ()> = Err(());
let a = Arc::new(Mutex::new(()));
let mut lock = a.lock();
if e.is_err() {
futures::future::ready(e).await;
return;
}
}
}
解决此问题的最直接方法是您在 yield
在函数末尾对项目进行操作之前已经做的事情:将所有锁定封装在它自己的范围内,而不在其中发出任何流项目范围。
这可以通过函数内的范围界定来完成 - 当您想使用 ?
运算符发出错误时会变得很烦人 - 或者通过同步函数或同步闭包在内部锁定互斥锁并在返回之前将其删除。通过这种方式,您可以通过 ?
对结果进行相当符合人体工程学的错误处理:
fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
async_stream::try_stream! {
let a = Arc::new(Mutex::new(()));
for item in &[1,2,3] {
let scoped = || {
let lock = a.try_lock().expect("aa");
Err(anyhow::anyhow!("asdf"))?;
Ok::<(), anyhow::Error>(*lock)
};
scoped()?;
yield *item;
}
}
}