延迟 Tokio Stream
Delaying a Tokio Stream
给定一个 Stream
,我想创建一个新的 Stream
,其中生成的元素之间有时间延迟。
我尝试编写代码来使用 tokio_core::reactor::Timeout
和 and_then
组合器实现 Stream
s,但延迟不起作用:我立即获得所有元素,刻不容缓。
这是一个独立的示例 (playground):
extern crate tokio_core;
extern crate futures;
use std::time::Duration;
use futures::{Future, Stream, stream, Sink};
use self::futures::sync::{mpsc};
use tokio_core::reactor;
const NUM_ITEMS: u32 = 8;
fn main() {
let mut core = reactor::Core::new().unwrap();
let handle = core.handle();
let chandle = handle.clone();
let (sink, stream) = mpsc::channel::<u32>(0);
let send_stream = stream::iter_ok(0 .. NUM_ITEMS)
.and_then(move |i: u32| {
let cchandle = chandle.clone();
println!("Creating a timeout object...");
reactor::Timeout::new(Duration::new(1,0), &cchandle)
.map_err(|_| ())
.and_then(|_| Ok(i))
});
let sink = sink.sink_map_err(|_| ());
handle.spawn(sink.send_all(send_stream).and_then(|_| Ok(())));
let mut incoming_items = Vec::new();
{
let keep_messages = stream.for_each(|item| {
incoming_items.push(item);
println!("item = {}", item);
Ok(())
});
core.run(keep_messages).unwrap();
}
assert_eq!(incoming_items, (0 .. NUM_ITEMS).collect::<Vec<u32>>());
}
为了完整起见,这是我得到的输出:
Creating a timeout object...
Creating a timeout object...
item = 0
Creating a timeout object...
item = 1
Creating a timeout object...
item = 2
Creating a timeout object...
item = 3
Creating a timeout object...
item = 4
Creating a timeout object...
item = 5
Creating a timeout object...
item = 6
item = 7
我怀疑问题出在这些行中:
reactor::Timeout::new(Duration::new(1,0), &cchandle)
.map_err(|_| ())
.and_then(|_| Ok(i))
可能我并没有真正等待返回的 Timeout
对象,但我不确定如何解决它。
正如我所怀疑的,问题是新创建的 Timeout
的操作(使用 and_then
)。我们要么需要先解包调用 reactor::Timeout::new
的结果,如果手动完成可能会变得混乱,或者使用 into_future
将结果转换为 Future
,然后使用它使用 Future
组合器。
解题代码:
extern crate tokio_core;
extern crate futures;
use std::time::Duration;
use futures::{Future, Stream, stream, Sink, IntoFuture};
use self::futures::sync::{mpsc};
use tokio_core::reactor;
const NUM_ITEMS: u32 = 8;
fn main() {
let mut core = reactor::Core::new().unwrap();
let handle = core.handle();
let chandle = handle.clone();
let (sink, stream) = mpsc::channel::<u32>(0);
let send_stream = stream::iter_ok(0 .. NUM_ITEMS)
.and_then(move |i: u32| {
let cchandle = chandle.clone();
println!("Creating a timeout object...");
reactor::Timeout::new(Duration::new(1,0), &cchandle)
.into_future()
.and_then(move |timeout| timeout.and_then(move |_| Ok(i)))
.map_err(|_| ())
});
let sink = sink.sink_map_err(|_| ());
handle.spawn(sink.send_all(send_stream).and_then(|_| Ok(())));
let mut incoming_items = Vec::new();
{
let keep_messages = stream.for_each(|item| {
incoming_items.push(item);
println!("item = {}", item);
Ok(())
});
core.run(keep_messages).unwrap();
}
assert_eq!(incoming_items, (0 .. NUM_ITEMS).collect::<Vec<u32>>());
}
请注意,使用了两个 and_then
。第一个解包通过调用 reactor::Timeout::new
获得的 Result
。第二个实际上等待 Timeout
触发。
给定一个 Stream
,我想创建一个新的 Stream
,其中生成的元素之间有时间延迟。
我尝试编写代码来使用 tokio_core::reactor::Timeout
和 and_then
组合器实现 Stream
s,但延迟不起作用:我立即获得所有元素,刻不容缓。
这是一个独立的示例 (playground):
extern crate tokio_core;
extern crate futures;
use std::time::Duration;
use futures::{Future, Stream, stream, Sink};
use self::futures::sync::{mpsc};
use tokio_core::reactor;
const NUM_ITEMS: u32 = 8;
fn main() {
let mut core = reactor::Core::new().unwrap();
let handle = core.handle();
let chandle = handle.clone();
let (sink, stream) = mpsc::channel::<u32>(0);
let send_stream = stream::iter_ok(0 .. NUM_ITEMS)
.and_then(move |i: u32| {
let cchandle = chandle.clone();
println!("Creating a timeout object...");
reactor::Timeout::new(Duration::new(1,0), &cchandle)
.map_err(|_| ())
.and_then(|_| Ok(i))
});
let sink = sink.sink_map_err(|_| ());
handle.spawn(sink.send_all(send_stream).and_then(|_| Ok(())));
let mut incoming_items = Vec::new();
{
let keep_messages = stream.for_each(|item| {
incoming_items.push(item);
println!("item = {}", item);
Ok(())
});
core.run(keep_messages).unwrap();
}
assert_eq!(incoming_items, (0 .. NUM_ITEMS).collect::<Vec<u32>>());
}
为了完整起见,这是我得到的输出:
Creating a timeout object...
Creating a timeout object...
item = 0
Creating a timeout object...
item = 1
Creating a timeout object...
item = 2
Creating a timeout object...
item = 3
Creating a timeout object...
item = 4
Creating a timeout object...
item = 5
Creating a timeout object...
item = 6
item = 7
我怀疑问题出在这些行中:
reactor::Timeout::new(Duration::new(1,0), &cchandle)
.map_err(|_| ())
.and_then(|_| Ok(i))
可能我并没有真正等待返回的 Timeout
对象,但我不确定如何解决它。
正如我所怀疑的,问题是新创建的 Timeout
的操作(使用 and_then
)。我们要么需要先解包调用 reactor::Timeout::new
的结果,如果手动完成可能会变得混乱,或者使用 into_future
将结果转换为 Future
,然后使用它使用 Future
组合器。
解题代码:
extern crate tokio_core;
extern crate futures;
use std::time::Duration;
use futures::{Future, Stream, stream, Sink, IntoFuture};
use self::futures::sync::{mpsc};
use tokio_core::reactor;
const NUM_ITEMS: u32 = 8;
fn main() {
let mut core = reactor::Core::new().unwrap();
let handle = core.handle();
let chandle = handle.clone();
let (sink, stream) = mpsc::channel::<u32>(0);
let send_stream = stream::iter_ok(0 .. NUM_ITEMS)
.and_then(move |i: u32| {
let cchandle = chandle.clone();
println!("Creating a timeout object...");
reactor::Timeout::new(Duration::new(1,0), &cchandle)
.into_future()
.and_then(move |timeout| timeout.and_then(move |_| Ok(i)))
.map_err(|_| ())
});
let sink = sink.sink_map_err(|_| ());
handle.spawn(sink.send_all(send_stream).and_then(|_| Ok(())));
let mut incoming_items = Vec::new();
{
let keep_messages = stream.for_each(|item| {
incoming_items.push(item);
println!("item = {}", item);
Ok(())
});
core.run(keep_messages).unwrap();
}
assert_eq!(incoming_items, (0 .. NUM_ITEMS).collect::<Vec<u32>>());
}
请注意,使用了两个 and_then
。第一个解包通过调用 reactor::Timeout::new
获得的 Result
。第二个实际上等待 Timeout
触发。