是否有用于异步流的 tuple_windows() 适配器?

Is there a tuple_windows() adapter for async streams?

我有一个看起来像这样的代码

use itertools::Itertools;

let (tx, rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
    for (v1, v2) in rx.into_iter().tuple_windows() {
        // do some computation
    }
}

for v in (0..) {
    tx.send(v).unwrap();
}

当我将频道更改为 tokio::mpsc::channel() 时,rx 变为异步流(即 futures::Stream),它没有 .tuple_windows() 适配器

您是否知道为 Streams 提供与 Itertools 类似功能的板条箱?如果没有,您建议如何执行此操作?

Futures 有一个 StreamExt。 那里没有 windows 功能,但您可以使用它来实现自己的扩展。

某事喜欢:

use async_trait::async_trait;
use futures::stream::StreamExt;
use std::pin::Pin;

#[async_trait]
trait TuplesWindowsExt: StreamExt + Unpin {
    async fn tuples(
        self: &mut Pin<Box<Self>>,
    ) -> (
        Option<<Self as futures::Stream>::Item>,
        Option<<Self as futures::Stream>::Item>,
    )
    where
        <Self as futures::Stream>::Item: Send,
    {
        let a = self.next().await;
        let b = self.next().await;
        (a, b)
    }
}

Playground

OP 在这里,我最终遵循@Netwave 的回答并实现了我自己的扩展,实际实现有点不同,以便产生滑动 windows(如 .tuple_windows() 用于 Itertools): [0, 1, 2, 3] -> [(0, 1), (1, 2), (2, 3)]

这不是微不足道的,所以这里是为任何可能需要它的人准备的

impl<T: Stream> TupleWindowsExt for T {}
trait TupleWindowsExt: Stream {
    fn tuple_windows(self) -> TupleWindows<Self>
    where
        Self: Sized,
    {
        TupleWindows::new(self)
    }
}

pin_project! {
    #[derive(Debug)]
    struct TupleWindows<S: Stream> {
        #[pin]
        stream: S,
        previous: Option<S::Item>,
    }
}

impl<S: Stream> TupleWindows<S> {
    pub fn new(stream: S) -> Self {
        Self {
            stream,
            previous: None,
        }
    }
}

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,
{
    type Item = (S::Item, S::Item);

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
            Some(next) => next,
            None => return Poll::Ready(None),
        };

        if let Some(previous) = this.previous {
            let res = (previous.clone(), current.clone());
            *this.previous = Some(current);
            Poll::Ready(Some(res))
        } else {
            let next = match this.stream.poll_next(cx) {
                Poll::Ready(next) => next,
                Poll::Pending => {
                    *this.previous = Some(current);
                    return Poll::Pending;
                }
            };
            *this.previous = next.clone();
            Poll::Ready(next.map(|next| (current, next)))
        }
    }
}