我如何每 N 秒从无界队列中提取消息并将它们生成到 Tokio 处理程序?

How do I extract messages from an unbounded queue every N seconds and spawn them on to a Tokio handler?

我正在尝试每隔 N 秒从无界队列中提取消息(它们本身就是期货),并将它们生成到 Tokio 处理程序中。

我尝试了几十种变体,但似乎找不到正确的方法。看起来应该是可行的,但我总是遇到未来类型不匹配或以借用问题告终。

这是或多或少显示了我想要的内容的代码:

let fut = Interval::new_interval(Duration::from_secs(1))
        .for_each(|num| vantage_dequeuer.into_future() )
        .for_each(|message:VantageMessage |{
            handle.spawn(message);
            return Ok(());
        })
        .map_err(|e| panic!("delay errored; err={:?}", e));

core.run(fut);

完整代码:

extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::future::ok;
use futures::sync::mpsc;
use futures::{Future, Stream};
use std::thread;
use std::time::Duration;
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

fn main() {
    let (enqueuer, dequeuer) = mpsc::unbounded();
    let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
        println!("Message!");
        return Ok(());
    }));
    enqueuer.unbounded_send(new_fut);
    let joinHandle = worker(Some(dequeuer));
    joinHandle.join();
}

/*
    Every second extract one message from dequeuer (or wait if not available)
    and spawn it in the core
*/
fn worker(
    mut vantage_dequeuer: Option<mpsc::UnboundedReceiver<VantageMessage>>,
) -> thread::JoinHandle<()> {
    let dequeuer = dequeuer.take().unwrap();
    let joinHandle = thread::spawn(|| {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let fut = Interval::new_interval(Duration::from_secs(1))
            .for_each(|num| vantage_dequeuer.into_future())
            .for_each(|message: VantageMessage| {
                handle.spawn(message);
                return Ok(());
            })
            .map_err(|e| panic!("delay errored; err={:?}", e));

        core.run(fut);
        println!("Returned!");
    });
    return joinHandle;
}

Playground

error[E0425]: cannot find value `dequeuer` in this scope
  --> src/main.rs:33:20
   |
33 |     let dequeuer = dequeuer.take().unwrap();
   |                    ^^^^^^^^ not found in this scope

error[E0599]: no method named `into_future` found for type `std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>>` in the current scope
  --> src/main.rs:38:46
   |
38 |             .for_each(|num| vantage_dequeuer.into_future())
   |                                              ^^^^^^^^^^^
   |
   = note: the method `into_future` exists but the following trait bounds were not satisfied:
           `&mut std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>> : futures::Stream`

IntervalUnboundedReceiver 都是流,所以我会使用 Stream::zip 来组合它们:

The zipped stream waits for both streams to produce an item, and then returns that pair. If an error happens, then that error will be returned immediately. If either stream ends then the zipped stream will also end.

extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::{
    future::ok,
    sync::mpsc,
    {Future, Stream},
};
use std::{thread, time::Duration};
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

pub fn main() {
    let (tx, rx) = mpsc::unbounded();

    let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
        println!("Message!");
        Ok(())
    }));
    tx.unbounded_send(new_fut).expect("Unable to send");
    drop(tx); // Close the sending side

    worker(rx).join().expect("Thread had a panic");
}

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
    thread::spawn(|| {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        core.run({
            Interval::new_interval(Duration::from_secs(1))
                .map_err(|e| panic!("delay errored; err={}", e))
                .zip(queue)
                .for_each(|(_, message)| {
                    handle.spawn(message);
                    Ok(())
                })
        })
        .expect("Unable to run reactor");
        println!("Returned!");
    })
}

请注意,这实际上并不等待任何衍生的未来在反应堆关闭之前完成。如果你想要,我会切换到 tokio::runtokio::spawn

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
    thread::spawn(|| {
        tokio::run({
            Interval::new_interval(Duration::from_secs(1))
                .map_err(|e| panic!("delay errored; err={}", e))
                .zip(queue)
                .for_each(|(_, message)| {
                    tokio::spawn(message);
                    Ok(())
                })
        });
        println!("Returned!");
    })
}