Rust 从未来创造一个水槽

Rust create a sink from a future

我想将一些 reader/writer 转换为元素的记录管道。我按照 使用 futures::stream::unfold 成功地使 reader-> 流方向。但是,我在使用 sink->writer 时遇到了麻烦。我基本上是在寻找 unfold.

的一些反函数

我知道有 AsyncWriterExt::into_sink 但这只有在我可以生成所有字节以在一批中写入时才有效。我还发现 建议在 with() 之后使用 drain()。然而,由于生命周期问题,这没有用(FnMut 无法有效地存储对作者的引用,或者至少我没有设法做到这一点。

所以我正在寻找的是一些我可以像 fold(initial_state, |element| {(writer.write(element).await, new_state)}) 这样调用的函数。你明白了(我希望)。

我也看到了 async_codec 但这对我来说似乎有点过分了。与此同时,我求助于将所有写入存储为流,然后使用 writer.into_sink().with_flat_map()。但这真的很难看。

编辑:我显然不是唯一想要这个的人,请参阅 upstream implementation。未来的用户 (呵呵) 将能够简单地使用 futures::sink::unfold.


好吧,我鼓起勇气,根据 futures::stream::unfold:

一起破解了一些东西
fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
    F: FnMut(T, Item) -> Fut,
    Fut: Future<Output = Result<T, E>>
{
    FoldSink {
        f,
        state: Some(init),
        fut: None,
    }
}


use pin_project::pin_project;

#[pin_project]
struct FoldSink<T, F, Fut> {
    f: F,
    state: Option<T>,
    #[pin]
    fut: Option<Fut>,
}

impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
    F: FnMut(T, Item) -> Fut,
    Fut: Future<Output = Result<T, E>>
{
    type Error = E;

    fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        let mut this = self.project();

        match this.fut.as_mut().as_pin_mut() {
            Some(fut) => {
                match fut.poll(ctx) {
                    Poll::Ready(Ok(new_state)) => {
                        this.fut.set(None);
                        *this.state = Some(new_state);
                        Poll::Ready(Ok(()))
                    },
                    Poll::Ready(Err(e)) => {
                        this.fut.set(None);
                        Poll::Ready(Err(e))
                    },
                    Poll::Pending => Poll::Pending,
                }
            },
            None => {
                Poll::Ready(Ok(()))
            }
        }
    }

    fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
        let mut this = self.project();
        this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
        Ok(())
    }

    fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        self.poll_ready(ctx)
    }

    fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        futures::ready!(self.as_mut().poll_ready(ctx))?;
        let this = self.project();
        this.state.take().unwrap();
        Poll::Ready(Ok(()))
    }
}

欢迎提出意见和改进!