`FuturesUnordered` 不满足 `Stream`?
`FuturesUnordered` does not satisfy `Stream`?
我想实现一个基于 FuturesUnordered
的 Stream
,它又应该用 return 类型的 [=15] 计算 async
函数=],但为了简化论证,我们假设它只是一个 Result<f64>
。由于 async fn
s 最终 return Future
s,我假设以下方式将是我必须如何定义我的结构:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}
impl Stream for MyDerivedStream<'_> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project();
this.from_futures.poll_next(c)
}
}
我现在 运行 遇到的问题是,由于某种原因 FuturesUnordered
上的 poll_next
函数由于不满足 Stream
特征边界而无法编译. (在this Playground example上自己看):
error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
--> src/lib.rs:21:27
|
21 | this.from_futures.poll_next(c)
| ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
|
55 | pub struct FuturesUnordered<Fut> {
| -------------------------------- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`
我很难理解这里的问题。据我所知,FuturesUnordered
确实实现了 Stream
,那么这里的实际问题是什么?是 &'a dyn Future
- 如果是这样,我还需要如何在此处定义类型才能使其正常工作?
&'a dyn Future
没有实现 Future
,这是 impl Stream for FuturesUnordered
所要求的。一种解决方案是将 &'a dyn Future
替换为 Pin<&mut 'a dyn Future>
:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}
impl<'a> Stream for MyDerivedStream<'a> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project().from_futures;
this.poll_next(c)
}
}
有必要可变地借用 FuturesUnordered
中的项目,这通过检查采用 self: Pin<&mut Self>
的 Futures::poll
函数变得明显。 Stream for FuturesUnordered
的实现需要轮询包装的项目以确定何时可以生成新项目,这对于共享引用是不可能的。
如果 &mut Future
周围没有 Pin
,则可能 mem::replace
包装未来并导致 Future
永远不会被实际轮询。
这是了解更多关于 Pinning 的重要资源:https://fasterthanli.me/articles/pin-and-suffering
我想实现一个基于 FuturesUnordered
的 Stream
,它又应该用 return 类型的 [=15] 计算 async
函数=],但为了简化论证,我们假设它只是一个 Result<f64>
。由于 async fn
s 最终 return Future
s,我假设以下方式将是我必须如何定义我的结构:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}
impl Stream for MyDerivedStream<'_> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project();
this.from_futures.poll_next(c)
}
}
我现在 运行 遇到的问题是,由于某种原因 FuturesUnordered
上的 poll_next
函数由于不满足 Stream
特征边界而无法编译. (在this Playground example上自己看):
error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
--> src/lib.rs:21:27
|
21 | this.from_futures.poll_next(c)
| ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
|
55 | pub struct FuturesUnordered<Fut> {
| -------------------------------- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`
我很难理解这里的问题。据我所知,FuturesUnordered
确实实现了 Stream
,那么这里的实际问题是什么?是 &'a dyn Future
- 如果是这样,我还需要如何在此处定义类型才能使其正常工作?
&'a dyn Future
没有实现 Future
,这是 impl Stream for FuturesUnordered
所要求的。一种解决方案是将 &'a dyn Future
替换为 Pin<&mut 'a dyn Future>
:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}
impl<'a> Stream for MyDerivedStream<'a> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project().from_futures;
this.poll_next(c)
}
}
有必要可变地借用 FuturesUnordered
中的项目,这通过检查采用 self: Pin<&mut Self>
的 Futures::poll
函数变得明显。 Stream for FuturesUnordered
的实现需要轮询包装的项目以确定何时可以生成新项目,这对于共享引用是不可能的。
如果 &mut Future
周围没有 Pin
,则可能 mem::replace
包装未来并导致 Future
永远不会被实际轮询。
这是了解更多关于 Pinning 的重要资源:https://fasterthanli.me/articles/pin-and-suffering