在编译时等待一些未知的未来
Awaiting a Number of Futures Unknown at Compile Time
我想利用 Tokio 的 运行time 来处理可变数量的异步期货。由于在编译时期货的数量是未知的,似乎 FuturesUnordered is my best option (macros such as select!
require specifying your branches at compile time; join_all 是可能的,但文档在“很多情况下”推荐 FuturesUnordered,当顺序无关紧要时)。
这个片段的逻辑是一个 recv() 循环被推送到 futures 桶中,它应该总是 运行。当新数据到达时,它的 parsing/processing 也被推送到 futures 桶中(而不是立即处理)。这确保接收器在响应新事件时保持低延迟,并且数据处理(可能需要大量计算的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。
This thread 顺便解释一下为什么期货得到 .boxed()
。
问题是这个神秘的错误:
error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
--> src/main.rs:27:8
|
27 | }).boxed());
| ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
= note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
= note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
= note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>`
= note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
= note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
= note: required because it appears within the type `[static generator@src/main.rs:16:25: 27:6 _]`
= note: required because it appears within the type `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>`
= note: required because it appears within the type `impl futures::Future`
它看起来像“递归地”推送到 UnorderedFutures(我猜不是真的,但你还能怎么称呼它?)不起作用,但我不是确定为什么。此错误表明 FuturesUnordered
趋向于 Box'd & Pin'd 异步块不满足某些 Sync
特征要求——我猜这个要求只是因为 &FuturesUnordered
(在 futures.push(...)
期间使用,因为该方法借用了 &self)需要它来实现其 Send
特征...或其他什么?
use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut futures = FuturesUnordered::new();
let (tx, rx) = mpsc::channel(32);
tokio::spawn( foo(tx) ); // Only the receiver is relevant; its transmitter is
// elsewhere, occasionally sending data.
futures.push((async { // <--- NOTE: futures.push()
loop {
match rx.recv().await {
Some(data) => {
futures.push((async move { // <--- NOTE: nested futures.push()
let _ = data; // TODO: replace with code that processes 'data'
}).boxed());
},
None => {}
}
}
}).boxed());
while let Some(_) = futures.next().await {}
Ok(())
}
我将把低级错误留给另一个答案,但我相信解决高级问题的更惯用的方法是将 FuturesUnordered
与 [=12 之类的东西结合使用=]如下:
use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
#[tokio::main]
pub async fn main() {
let mut futures = FuturesUnordered::new();
let (tx, mut rx) = mpsc::channel(32);
//turn foo into something more concrete
tokio::spawn(async move {
let _ = tx.send(42i32).await;
});
loop {
tokio::select! {
Some(data) = rx.recv() => {
futures.push(async move {
data.to_string()
});
},
Some(result) = futures.next() => {
println!("{}", result)
},
else => break,
}
}
}
您可以在此处阅读有关 select 宏的更多信息:https://tokio.rs/tokio/tutorial/select
当您使用 boxed
方法将异步块创建的 future 装箱时,您试图将其强制转换为 dyn Future + Send
:
pub fn boxed<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>
然而,创造的未来不是Send
。为什么?因为在它里面,你试图推到FuturesUnordered
,借用它:
pub fn push(&self, future: Fut)
这意味着 async
块捕获了一个 &FuturesUnordered
。对于一个类型 Send
,它的所有字段必须是 Send
,因此对于生成的未来 Send
,&FuturesUnordered
必须是 Send
.
要使引用成为 Send
,类型也必须是 Sync
:
impl<'_, T> Send for &'_ T where
T: Sync
并且 FuturesUnordered
为 Sync
,存储的期货也必须为 Sync
:
impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
但是boxed
返回的future不一定是Sync
:
pub fn boxed<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>
这意味着异步生成器不是 Send
,因此您无法将其强制转换为 dyn Future + Send
,并且您会收到一条令人困惑的错误消息。
解决方法是添加一个Sync
绑定到未来,然后手动Box::pin
:
type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
let mut futures = FuturesUnordered::<BoxedFuture>::new();
futures.push(Box::pin(async {
loop {
match rx.recv().await {
Some(data) => {
futures.push(Box::pin(async move {
let _ = data;
}));
}
None => {}
}
}
}));
但是,您随后会运行陷入一堆借贷问题。更好的解决方案是使用 tokio::select!
而不是外部 push
,正如 Michael 的回答所解释的那样。
我想利用 Tokio 的 运行time 来处理可变数量的异步期货。由于在编译时期货的数量是未知的,似乎 FuturesUnordered is my best option (macros such as select!
require specifying your branches at compile time; join_all 是可能的,但文档在“很多情况下”推荐 FuturesUnordered,当顺序无关紧要时)。
这个片段的逻辑是一个 recv() 循环被推送到 futures 桶中,它应该总是 运行。当新数据到达时,它的 parsing/processing 也被推送到 futures 桶中(而不是立即处理)。这确保接收器在响应新事件时保持低延迟,并且数据处理(可能需要大量计算的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。
This thread 顺便解释一下为什么期货得到 .boxed()
。
问题是这个神秘的错误:
error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely --> src/main.rs:27:8 | 27 | }).boxed()); | ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely | = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send` = note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>` = note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>` = note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>` = note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>` = note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>` = note: required because it appears within the type `[static generator@src/main.rs:16:25: 27:6 _]` = note: required because it appears within the type `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>` = note: required because it appears within the type `impl futures::Future`
它看起来像“递归地”推送到 UnorderedFutures(我猜不是真的,但你还能怎么称呼它?)不起作用,但我不是确定为什么。此错误表明 FuturesUnordered
趋向于 Box'd & Pin'd 异步块不满足某些 Sync
特征要求——我猜这个要求只是因为 &FuturesUnordered
(在 futures.push(...)
期间使用,因为该方法借用了 &self)需要它来实现其 Send
特征...或其他什么?
use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut futures = FuturesUnordered::new();
let (tx, rx) = mpsc::channel(32);
tokio::spawn( foo(tx) ); // Only the receiver is relevant; its transmitter is
// elsewhere, occasionally sending data.
futures.push((async { // <--- NOTE: futures.push()
loop {
match rx.recv().await {
Some(data) => {
futures.push((async move { // <--- NOTE: nested futures.push()
let _ = data; // TODO: replace with code that processes 'data'
}).boxed());
},
None => {}
}
}
}).boxed());
while let Some(_) = futures.next().await {}
Ok(())
}
我将把低级错误留给另一个答案,但我相信解决高级问题的更惯用的方法是将 FuturesUnordered
与 [=12 之类的东西结合使用=]如下:
use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
#[tokio::main]
pub async fn main() {
let mut futures = FuturesUnordered::new();
let (tx, mut rx) = mpsc::channel(32);
//turn foo into something more concrete
tokio::spawn(async move {
let _ = tx.send(42i32).await;
});
loop {
tokio::select! {
Some(data) = rx.recv() => {
futures.push(async move {
data.to_string()
});
},
Some(result) = futures.next() => {
println!("{}", result)
},
else => break,
}
}
}
您可以在此处阅读有关 select 宏的更多信息:https://tokio.rs/tokio/tutorial/select
当您使用 boxed
方法将异步块创建的 future 装箱时,您试图将其强制转换为 dyn Future + Send
:
pub fn boxed<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>
然而,创造的未来不是Send
。为什么?因为在它里面,你试图推到FuturesUnordered
,借用它:
pub fn push(&self, future: Fut)
这意味着 async
块捕获了一个 &FuturesUnordered
。对于一个类型 Send
,它的所有字段必须是 Send
,因此对于生成的未来 Send
,&FuturesUnordered
必须是 Send
.
要使引用成为 Send
,类型也必须是 Sync
:
impl<'_, T> Send for &'_ T where
T: Sync
并且 FuturesUnordered
为 Sync
,存储的期货也必须为 Sync
:
impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
但是boxed
返回的future不一定是Sync
:
pub fn boxed<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>
这意味着异步生成器不是 Send
,因此您无法将其强制转换为 dyn Future + Send
,并且您会收到一条令人困惑的错误消息。
解决方法是添加一个Sync
绑定到未来,然后手动Box::pin
:
type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
let mut futures = FuturesUnordered::<BoxedFuture>::new();
futures.push(Box::pin(async {
loop {
match rx.recv().await {
Some(data) => {
futures.push(Box::pin(async move {
let _ = data;
}));
}
None => {}
}
}
}));
但是,您随后会运行陷入一堆借贷问题。更好的解决方案是使用 tokio::select!
而不是外部 push
,正如 Michael 的回答所解释的那样。