是否有用于异步流的 tuple_windows() 适配器?
Is there a tuple_windows() adapter for async streams?
我有一个看起来像这样的代码
use itertools::Itertools;
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
for (v1, v2) in rx.into_iter().tuple_windows() {
// do some computation
}
}
for v in (0..) {
tx.send(v).unwrap();
}
当我将频道更改为 tokio::mpsc::channel()
时,rx
变为异步流(即 futures::Stream
),它没有 .tuple_windows()
适配器
您是否知道为 Stream
s 提供与 Itertools
类似功能的板条箱?如果没有,您建议如何执行此操作?
Futures 有一个 StreamExt
。
那里没有 windows 功能,但您可以使用它来实现自己的扩展。
某事喜欢:
use async_trait::async_trait;
use futures::stream::StreamExt;
use std::pin::Pin;
#[async_trait]
trait TuplesWindowsExt: StreamExt + Unpin {
async fn tuples(
self: &mut Pin<Box<Self>>,
) -> (
Option<<Self as futures::Stream>::Item>,
Option<<Self as futures::Stream>::Item>,
)
where
<Self as futures::Stream>::Item: Send,
{
let a = self.next().await;
let b = self.next().await;
(a, b)
}
}
OP 在这里,我最终遵循@Netwave 的回答并实现了我自己的扩展,实际实现有点不同,以便产生滑动 windows(如 .tuple_windows()
用于 Itertools
): [0, 1, 2, 3]
-> [(0, 1), (1, 2), (2, 3)]
这不是微不足道的,所以这里是为任何可能需要它的人准备的
impl<T: Stream> TupleWindowsExt for T {}
trait TupleWindowsExt: Stream {
fn tuple_windows(self) -> TupleWindows<Self>
where
Self: Sized,
{
TupleWindows::new(self)
}
}
pin_project! {
#[derive(Debug)]
struct TupleWindows<S: Stream> {
#[pin]
stream: S,
previous: Option<S::Item>,
}
}
impl<S: Stream> TupleWindows<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
previous: None,
}
}
}
impl<S: Stream> Stream for TupleWindows<S>
where
S::Item: Clone,
{
type Item = (S::Item, S::Item);
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(next) => next,
None => return Poll::Ready(None),
};
if let Some(previous) = this.previous {
let res = (previous.clone(), current.clone());
*this.previous = Some(current);
Poll::Ready(Some(res))
} else {
let next = match this.stream.poll_next(cx) {
Poll::Ready(next) => next,
Poll::Pending => {
*this.previous = Some(current);
return Poll::Pending;
}
};
*this.previous = next.clone();
Poll::Ready(next.map(|next| (current, next)))
}
}
}
我有一个看起来像这样的代码
use itertools::Itertools;
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
for (v1, v2) in rx.into_iter().tuple_windows() {
// do some computation
}
}
for v in (0..) {
tx.send(v).unwrap();
}
当我将频道更改为 tokio::mpsc::channel()
时,rx
变为异步流(即 futures::Stream
),它没有 .tuple_windows()
适配器
您是否知道为 Stream
s 提供与 Itertools
类似功能的板条箱?如果没有,您建议如何执行此操作?
Futures 有一个 StreamExt
。
那里没有 windows 功能,但您可以使用它来实现自己的扩展。
某事喜欢:
use async_trait::async_trait;
use futures::stream::StreamExt;
use std::pin::Pin;
#[async_trait]
trait TuplesWindowsExt: StreamExt + Unpin {
async fn tuples(
self: &mut Pin<Box<Self>>,
) -> (
Option<<Self as futures::Stream>::Item>,
Option<<Self as futures::Stream>::Item>,
)
where
<Self as futures::Stream>::Item: Send,
{
let a = self.next().await;
let b = self.next().await;
(a, b)
}
}
OP 在这里,我最终遵循@Netwave 的回答并实现了我自己的扩展,实际实现有点不同,以便产生滑动 windows(如 .tuple_windows()
用于 Itertools
): [0, 1, 2, 3]
-> [(0, 1), (1, 2), (2, 3)]
这不是微不足道的,所以这里是为任何可能需要它的人准备的
impl<T: Stream> TupleWindowsExt for T {}
trait TupleWindowsExt: Stream {
fn tuple_windows(self) -> TupleWindows<Self>
where
Self: Sized,
{
TupleWindows::new(self)
}
}
pin_project! {
#[derive(Debug)]
struct TupleWindows<S: Stream> {
#[pin]
stream: S,
previous: Option<S::Item>,
}
}
impl<S: Stream> TupleWindows<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
previous: None,
}
}
}
impl<S: Stream> Stream for TupleWindows<S>
where
S::Item: Clone,
{
type Item = (S::Item, S::Item);
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(next) => next,
None => return Poll::Ready(None),
};
if let Some(previous) = this.previous {
let res = (previous.clone(), current.clone());
*this.previous = Some(current);
Poll::Ready(Some(res))
} else {
let next = match this.stream.poll_next(cx) {
Poll::Ready(next) => next,
Poll::Pending => {
*this.previous = Some(current);
return Poll::Pending;
}
};
*this.previous = next.clone();
Poll::Ready(next.map(|next| (current, next)))
}
}
}