从 Rust 的通道迭代器中获取第一个接收到的值

Get the first received value from an iterator of channels in rust

我有一个 futures::channel::mpsc::UnboundedReceiver<T> 的迭代器。我想处理接收者的每一个答案,一次只处理一个,但同时还要处理其他期货。

这应该可以通过循环 futures::select! 来实现。但我需要某种方法从 UnboundReceiver<T> 中获取已解析的值。 我尝试使用 futures::future::select_all(Iter),但编译失败并出现错误:futures::channel::mpsc::UnboundedReceiver<T> is not a future.

游乐场示例是 here

futures::channel::mpsc::UnboundedReceiver 实现 Stream 但不是未来,因此您可以通过调用 futures::stream::select_all(recv) 创建 SelectAll 然后解析为通过调用 select_all.next() 发送下一条就绪消息。 我通过使用它改编了你的例子:

use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1

#[tokio::main]
async fn main() -> failure::Fallible<()> {
    let mut recv = Vec::new();
    let mut futures = stream::FuturesUnordered::new();
    for _i in 0..3 {
        let (tx, rx) = mpsc::unbounded();
        recv.push(rx);
        futures.push(tokio::spawn(async move {
            tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
            tx.unbounded_send("Message").unwrap();
        }));
    }
    let mut select_all = select_all(recv);
    loop {
        futures::select! {
            msg = select_all.next() => {
                println!("{:#?}", msg);
            }
            _ = futures.select_next_some() => {
                eprintln!("Thread died");
            },
            complete => break
        }
    }
    Ok(())
}

请注意,这不是多线程而是异步编程,您生成异步的 tokio 任务而不是线程。 我建议在这里阅读答案: