如何将 Future 转换为 Stream?

How to convert a Future into a Stream?

我正在尝试使用 async_std 从网络接收 UDP 数据报。

有一个 UdpSocket 实现了 async recv_from,这个方法 returns 一个未来,但我需要一个 async_std::stream::Stream 来提供 UDP 数据报流,因为它是一个更好的抽象。

我发现 tokio::net::UdpFramed 完全符合我的需要,但它在当前版本的 tokio 中不可用。

一般来说,问题是如何将 Futures 从给定的异步函数转换为 Stream

对于单个项目,使用 FutureExt::into_stream:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

对于闭包生成的来自多个期货的流,使用 stream::unfold:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}

在你的情况下,你可以使用 stream::unfold:

use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}

Generally speaking the question is how do I convert Futures from a given async function into a Stream?

FutureExt::into_stream,但不要被名字骗了;它不适合你的情况。

There is a UdpSocket that implements async recv_from, this method returns a future but I need a async_std::stream::Stream that gives a stream of UDP datagrams because it is a better abstraction.

这里不一定更好的抽象。

具体来说,async-stdUdpSocket::recv_from returns 输出类型为 (usize, SocketAddr) 的未来——接收数据的大小和对等地址。如果您要使用 into_stream 将其转换为流,它只会给您那个,而不是收到的数据。

I've found tokio::net::UdpFramed that does exactly what I need but it is not available in current versions of tokio.

已移至 tokio-util crate. Unfortunately, you can't (easily) use that either. It requires a tokio::net::UdpSocket, which is not the same as async_std::net::UdpSocket

当然,您可以使用 futures::stream::poll_fn or futures::stream::unfold to give UdpSocket::recv_from a futures::stream::Stream facade, but then what will you do with that? If you end up calling StreamExt::next 等期货实用函数从中轮询一个值,您可以直接使用 recv_from

如果您使用的某些 API 需要 Stream 输入,例如 rusoto:

,则只需要使用 Stream