如何将 Future 转换为 Stream?
How to convert a Future into a Stream?
我正在尝试使用 async_std
从网络接收 UDP 数据报。
有一个 UdpSocket
实现了 async recv_from
,这个方法 returns 一个未来,但我需要一个 async_std::stream::Stream
来提供 UDP 数据报流,因为它是一个更好的抽象。
我发现 tokio::net::UdpFramed
完全符合我的需要,但它在当前版本的 tokio 中不可用。
一般来说,问题是如何将 Future
s 从给定的异步函数转换为 Stream
?
对于单个项目,使用 FutureExt::into_stream
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
inner().into_stream()
}
async fn inner() -> i32 {
42
}
对于闭包生成的来自多个期货的流,使用 stream::unfold
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
stream::unfold((), |()| async { Some((inner().await, ())) })
}
async fn inner() -> i32 {
42
}
在你的情况下,你可以使用 stream::unfold
:
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1
fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
stream::unfold(s, |s| {
async {
let data = read_one(&s).await;
Some((data, s))
}
})
}
async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
let mut data = vec![0; 1024];
let (len, _) = s.recv_from(&mut data).await?;
data.truncate(len);
Ok(data)
}
#[async_std::main]
async fn main() -> io::Result<()> {
let s = UdpSocket::bind("0.0.0.0:9876").await?;
read_many(s)
.for_each(|d| {
async {
match d {
Ok(d) => match std::str::from_utf8(&d) {
Ok(s) => println!("{}", s),
Err(_) => println!("{:x?}", d),
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
Ok(())
}
Generally speaking the question is how do I convert Futures
from a given async function into a Stream
?
有 FutureExt::into_stream
,但不要被名字骗了;它不适合你的情况。
There is a UdpSocket
that implements async recv_from
, this method returns a future but I need a async_std::stream::Stream
that gives a stream of UDP datagrams because it is a better abstraction.
这里不一定更好的抽象。
具体来说,async-std
的 UdpSocket::recv_from
returns 输出类型为 (usize, SocketAddr)
的未来——接收数据的大小和对等地址。如果您要使用 into_stream
将其转换为流,它只会给您那个,而不是收到的数据。
I've found tokio::net::UdpFramed
that does exactly what I need but it is not available in current versions of tokio.
已移至 tokio-util
crate. Unfortunately, you can't (easily) use that either. It requires a tokio::net::UdpSocket
, which is not the same as async_std::net::UdpSocket
。
当然,您可以使用 futures::stream::poll_fn
or futures::stream::unfold
to give UdpSocket::recv_from
a futures::stream::Stream
facade, but then what will you do with that? If you end up calling StreamExt::next
等期货实用函数从中轮询一个值,您可以直接使用 recv_from
。
如果您使用的某些 API 需要 Stream
输入,例如 rusoto
:
,则只需要使用 Stream
我正在尝试使用 async_std
从网络接收 UDP 数据报。
有一个 UdpSocket
实现了 async recv_from
,这个方法 returns 一个未来,但我需要一个 async_std::stream::Stream
来提供 UDP 数据报流,因为它是一个更好的抽象。
我发现 tokio::net::UdpFramed
完全符合我的需要,但它在当前版本的 tokio 中不可用。
一般来说,问题是如何将 Future
s 从给定的异步函数转换为 Stream
?
对于单个项目,使用 FutureExt::into_stream
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
inner().into_stream()
}
async fn inner() -> i32 {
42
}
对于闭包生成的来自多个期货的流,使用 stream::unfold
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
stream::unfold((), |()| async { Some((inner().await, ())) })
}
async fn inner() -> i32 {
42
}
在你的情况下,你可以使用 stream::unfold
:
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1
fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
stream::unfold(s, |s| {
async {
let data = read_one(&s).await;
Some((data, s))
}
})
}
async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
let mut data = vec![0; 1024];
let (len, _) = s.recv_from(&mut data).await?;
data.truncate(len);
Ok(data)
}
#[async_std::main]
async fn main() -> io::Result<()> {
let s = UdpSocket::bind("0.0.0.0:9876").await?;
read_many(s)
.for_each(|d| {
async {
match d {
Ok(d) => match std::str::from_utf8(&d) {
Ok(s) => println!("{}", s),
Err(_) => println!("{:x?}", d),
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
Ok(())
}
Generally speaking the question is how do I convert
Futures
from a given async function into aStream
?
有 FutureExt::into_stream
,但不要被名字骗了;它不适合你的情况。
There is a
UdpSocket
that implements asyncrecv_from
, this method returns a future but I need aasync_std::stream::Stream
that gives a stream of UDP datagrams because it is a better abstraction.
这里不一定更好的抽象。
具体来说,async-std
的 UdpSocket::recv_from
returns 输出类型为 (usize, SocketAddr)
的未来——接收数据的大小和对等地址。如果您要使用 into_stream
将其转换为流,它只会给您那个,而不是收到的数据。
I've found
tokio::net::UdpFramed
that does exactly what I need but it is not available in current versions of tokio.
已移至 tokio-util
crate. Unfortunately, you can't (easily) use that either. It requires a tokio::net::UdpSocket
, which is not the same as async_std::net::UdpSocket
。
当然,您可以使用 futures::stream::poll_fn
or futures::stream::unfold
to give UdpSocket::recv_from
a futures::stream::Stream
facade, but then what will you do with that? If you end up calling StreamExt::next
等期货实用函数从中轮询一个值,您可以直接使用 recv_from
。
如果您使用的某些 API 需要 Stream
输入,例如 rusoto
:
Stream