从 Rust 的通道迭代器中获取第一个接收到的值
Get the first received value from an iterator of channels in rust
我有一个 futures::channel::mpsc::UnboundedReceiver<T>
的迭代器。我想处理接收者的每一个答案,一次只处理一个,但同时还要处理其他期货。
这应该可以通过循环 futures::select! 来实现。但我需要某种方法从 UnboundReceiver<T>
中获取已解析的值。
我尝试使用 futures::future::select_all(Iter)
,但编译失败并出现错误:futures::channel::mpsc::UnboundedReceiver<T> is not a future
.
游乐场示例是 here。
futures::channel::mpsc::UnboundedReceiver 实现 Stream
但不是未来,因此您可以通过调用 futures::stream::select_all(recv)
创建 SelectAll
然后解析为通过调用 select_all.next()
发送下一条就绪消息。
我通过使用它改编了你的例子:
use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1
#[tokio::main]
async fn main() -> failure::Fallible<()> {
let mut recv = Vec::new();
let mut futures = stream::FuturesUnordered::new();
for _i in 0..3 {
let (tx, rx) = mpsc::unbounded();
recv.push(rx);
futures.push(tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
tx.unbounded_send("Message").unwrap();
}));
}
let mut select_all = select_all(recv);
loop {
futures::select! {
msg = select_all.next() => {
println!("{:#?}", msg);
}
_ = futures.select_next_some() => {
eprintln!("Thread died");
},
complete => break
}
}
Ok(())
}
请注意,这不是多线程而是异步编程,您生成异步的 tokio 任务而不是线程。
我建议在这里阅读答案:
我有一个 futures::channel::mpsc::UnboundedReceiver<T>
的迭代器。我想处理接收者的每一个答案,一次只处理一个,但同时还要处理其他期货。
这应该可以通过循环 futures::select! 来实现。但我需要某种方法从 UnboundReceiver<T>
中获取已解析的值。
我尝试使用 futures::future::select_all(Iter)
,但编译失败并出现错误:futures::channel::mpsc::UnboundedReceiver<T> is not a future
.
游乐场示例是 here。
futures::channel::mpsc::UnboundedReceiver 实现 Stream
但不是未来,因此您可以通过调用 futures::stream::select_all(recv)
创建 SelectAll
然后解析为通过调用 select_all.next()
发送下一条就绪消息。
我通过使用它改编了你的例子:
use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1
#[tokio::main]
async fn main() -> failure::Fallible<()> {
let mut recv = Vec::new();
let mut futures = stream::FuturesUnordered::new();
for _i in 0..3 {
let (tx, rx) = mpsc::unbounded();
recv.push(rx);
futures.push(tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
tx.unbounded_send("Message").unwrap();
}));
}
let mut select_all = select_all(recv);
loop {
futures::select! {
msg = select_all.next() => {
println!("{:#?}", msg);
}
_ = futures.select_next_some() => {
eprintln!("Thread died");
},
complete => break
}
}
Ok(())
}
请注意,这不是多线程而是异步编程,您生成异步的 tokio 任务而不是线程。
我建议在这里阅读答案: