Rust 从未来创造一个水槽
Rust create a sink from a future
我想将一些 reader/writer 转换为元素的记录管道。我按照 使用 futures::stream::unfold
成功地使 reader-> 流方向。但是,我在使用 sink->writer 时遇到了麻烦。我基本上是在寻找 unfold
.
的一些反函数
我知道有 AsyncWriterExt::into_sink
但这只有在我可以生成所有字节以在一批中写入时才有效。我还发现 建议在 with()
之后使用 drain()
。然而,由于生命周期问题,这没有用(FnMut
无法有效地存储对作者的引用,或者至少我没有设法做到这一点。
所以我正在寻找的是一些我可以像 fold(initial_state, |element| {(writer.write(element).await, new_state)})
这样调用的函数。你明白了(我希望)。
我也看到了 async_codec
但这对我来说似乎有点过分了。与此同时,我求助于将所有写入存储为流,然后使用 writer.into_sink().with_flat_map()
。但这真的很难看。
编辑:我显然不是唯一想要这个的人,请参阅 upstream implementation。未来的用户 (呵呵) 将能够简单地使用 futures::sink::unfold
.
好吧,我鼓起勇气,根据 futures::stream::unfold
:
一起破解了一些东西
fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
FoldSink {
f,
state: Some(init),
fut: None,
}
}
use pin_project::pin_project;
#[pin_project]
struct FoldSink<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
type Error = E;
fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
let mut this = self.project();
match this.fut.as_mut().as_pin_mut() {
Some(fut) => {
match fut.poll(ctx) {
Poll::Ready(Ok(new_state)) => {
this.fut.set(None);
*this.state = Some(new_state);
Poll::Ready(Ok(()))
},
Poll::Ready(Err(e)) => {
this.fut.set(None);
Poll::Ready(Err(e))
},
Poll::Pending => Poll::Pending,
}
},
None => {
Poll::Ready(Ok(()))
}
}
}
fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
let mut this = self.project();
this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
Ok(())
}
fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
self.poll_ready(ctx)
}
fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
futures::ready!(self.as_mut().poll_ready(ctx))?;
let this = self.project();
this.state.take().unwrap();
Poll::Ready(Ok(()))
}
}
欢迎提出意见和改进!
我想将一些 reader/writer 转换为元素的记录管道。我按照 futures::stream::unfold
成功地使 reader-> 流方向。但是,我在使用 sink->writer 时遇到了麻烦。我基本上是在寻找 unfold
.
我知道有 AsyncWriterExt::into_sink
但这只有在我可以生成所有字节以在一批中写入时才有效。我还发现 with()
之后使用 drain()
。然而,由于生命周期问题,这没有用(FnMut
无法有效地存储对作者的引用,或者至少我没有设法做到这一点。
所以我正在寻找的是一些我可以像 fold(initial_state, |element| {(writer.write(element).await, new_state)})
这样调用的函数。你明白了(我希望)。
我也看到了 async_codec
但这对我来说似乎有点过分了。与此同时,我求助于将所有写入存储为流,然后使用 writer.into_sink().with_flat_map()
。但这真的很难看。
编辑:我显然不是唯一想要这个的人,请参阅 upstream implementation。未来的用户 (呵呵) 将能够简单地使用 futures::sink::unfold
.
好吧,我鼓起勇气,根据 futures::stream::unfold
:
fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
FoldSink {
f,
state: Some(init),
fut: None,
}
}
use pin_project::pin_project;
#[pin_project]
struct FoldSink<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
type Error = E;
fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
let mut this = self.project();
match this.fut.as_mut().as_pin_mut() {
Some(fut) => {
match fut.poll(ctx) {
Poll::Ready(Ok(new_state)) => {
this.fut.set(None);
*this.state = Some(new_state);
Poll::Ready(Ok(()))
},
Poll::Ready(Err(e)) => {
this.fut.set(None);
Poll::Ready(Err(e))
},
Poll::Pending => Poll::Pending,
}
},
None => {
Poll::Ready(Ok(()))
}
}
}
fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
let mut this = self.project();
this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
Ok(())
}
fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
self.poll_ready(ctx)
}
fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
futures::ready!(self.as_mut().poll_ready(ctx))?;
let this = self.project();
this.state.take().unwrap();
Poll::Ready(Ok(()))
}
}
欢迎提出意见和改进!