`FuturesUnordered` 不满足 `Stream`?

`FuturesUnordered` does not satisfy `Stream`?

我想实现一个基于 FuturesUnorderedStream,它又应该用 return 类型的 [=15] 计算 async 函数=],但为了简化论证,我们假设它只是一个 Result<f64>。由于 async fns 最终 return Futures,我假设以下方式将是我必须如何定义我的结构:

use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;

#[pin_project]
pub struct MyDerivedStream<'a> {
    #[pin]
    from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}

impl Stream for MyDerivedStream<'_> {
    type Item = Result<f64>;

    fn poll_next(
        self: Pin<&mut Self>,
        c: &mut std::task::Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        let this = self.project();

        this.from_futures.poll_next(c)
    }
}

我现在 运行 遇到的问题是,由于某种原因 FuturesUnordered 上的 poll_next 函数由于不满足 Stream 特征边界而无法编译. (在this Playground example上自己看):

error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
  --> src/lib.rs:21:27
   |
21 |         this.from_futures.poll_next(c)
   |                           ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
   | 
  ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
   |
55 | pub struct FuturesUnordered<Fut> {
   | -------------------------------- doesn't satisfy `_: futures::Stream`
   |
   = note: the following trait bounds were not satisfied:
           `&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
           which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`

我很难理解这里的问题。据我所知,FuturesUnordered 确实实现了 Stream,那么这里的实际问题是什么?是 &'a dyn Future - 如果是这样,我还需要如何在此处定义类型才能使其正常工作?

&'a dyn Future 没有实现 Future,这是 impl Stream for FuturesUnordered 所要求的。一种解决方案是将 &'a dyn Future 替换为 Pin<&mut 'a dyn Future>:

use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;

#[pin_project]
pub struct MyDerivedStream<'a> {
    #[pin]
    from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}

impl<'a> Stream for MyDerivedStream<'a> {
    type Item = Result<f64>;

    fn poll_next(
        self: Pin<&mut Self>,
        c: &mut std::task::Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        let this = self.project().from_futures;

        this.poll_next(c)
    }
}

有必要可变地借用 FuturesUnordered 中的项目,这通过检查采用 self: Pin<&mut Self>Futures::poll 函数变得明显。 Stream for FuturesUnordered 的实现需要轮询包装的项目以确定何时可以生成新项目,这对于共享引用是不可能的。

如果 &mut Future 周围没有 Pin,则可能 mem::replace 包装未来并导致 Future 永远不会被实际轮询。

这是了解更多关于 Pinning 的重要资源:https://fasterthanli.me/articles/pin-and-suffering