如何使一个简单的 futures::sync::mpsc::channel 示例起作用?

How to make a simple futures::sync::mpsc::channel example work?

我正在尝试编写 futures-rs mpsc 队列用法的简单示例:

extern crate futures; // v0.1 (old)

use futures::{Sink, Stream};
use futures::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>(1000);

    let handle = thread::spawn(move || {
        tx.clone().send(1);
        tx.clone().send(2);
        tx.clone().send(3);
    });

    let mut rx = rx.map(|x| {
        println!("stream: {}", x);
        x * x
    });

    handle.join().unwrap();

    rx.poll().unwrap();
}

但它不会向控制台输出任何内容(我希望它会打印 stream: 1stream: 2stream: 3)。我也尝试用 rx.wait() 替换 rx.poll().unwrap(),但它仍然没有输出。而且我没有在 futures-rs 文档中找到任何使用示例。我做错了什么?

强烈建议阅读编译器告诉您的警告和错误消息。这是带有编译器的静态类型语言的一大好处:

warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default
  --> src/main.rs:11:9
   |
11 |         tx.clone().send(1);
   |         ^^^^^^^^^^^^^^^^^^^

warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default
  --> src/main.rs:12:9
   |
12 |         tx.clone().send(2);
   |         ^^^^^^^^^^^^^^^^^^^

warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default
  --> src/main.rs:13:9
   |
13 |         tx.clone().send(3);
   |         ^^^^^^^^^^^^^^^^^^^

我不是期货专家,但是这个编译没有警告并打印所有三个值:

extern crate futures; // 0.1.23

use futures::{sync::mpsc, Async, Future, Sink, Stream};
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel(1000);

    let handle = thread::spawn(move || {
        tx.send(1)
            .and_then(|tx| tx.send(2))
            .and_then(|tx| tx.send(3))
            .wait()
            .expect("Unable to send");
    });

    let mut rx = rx.map(|x| x * x);

    handle.join().unwrap();

    while let Ok(Async::Ready(Some(v))) = rx.poll() {
        println!("stream: {}", v);
    }
}

and_then 用于在前一个值之后发送每个后续值。 wait 用于阻塞生成的线程,直到一切都成功发送。 poll 方法用于从队列中获取值,直到用完为止。有多种方法可能会失败,我将忽略它们,只关注成功案例。