如何用futures::sync::mpsc::channel实现阻塞队列机制?
How can I implement a blocking queue mechanism with futures::sync::mpsc::channel?
我正在尝试了解 futures::sync::mpsc::Receiver
的工作原理。在下面的示例中,接收线程休眠两秒,发送线程每秒发送一次。
我预计发件人会因为等待而需要阻塞,然后在释放缓冲区时发送。
我看到的是一段时间后就死锁了。增加通道的缓冲区只会延长时间,直到它被阻塞。
我应该怎么做才能让发送方在缓冲区可用时发送数据,并在这种情况下向发送方施加一些背压? futures::sync::mpsc::channel
有自己的文档,但我不知道如何正确使用它。
extern crate futures;
extern crate tokio_core;
use std::{thread, time};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats {
pub success: usize,
pub failure: usize,
}
fn main() {
let mut core = Core::new().expect("Failed to create core");
let remote = core.remote();
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || loop {
let tx = tx.clone();
let delay = time::Duration::from_secs(1);
thread::sleep(delay);
let f = ::futures::done::<(), ()>(Ok(()));
remote.spawn(|_| {
f.then(|res| {
println!("Sending");
tx.send(res).wait();
println!("Sent");
Ok(())
})
});
});
let mut stats = Stats {
success: 0,
failure: 0,
};
let f2 = rx.for_each(|res| {
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res {
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
}
println!("stats = {:?}", stats);
Ok(())
});
core.run(f2).expect("Core failed to run");
}
永远不要在 future 中调用 wait
。那是阻塞,阻塞永远不应该在未来进行。
永远不要在 future 中调用 sleep
。那是阻塞,阻塞永远不应该在未来进行。
通道背压是由 send
consumes the Sender
和 returns 一个未来实现的。当队列中有空间时,未来将 Sender
返回给您 。
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::{future, sync::mpsc, Future, Sink, Stream};
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats {
pub success: usize,
pub failure: usize,
}
fn main() {
tokio::run(future::lazy(|| {
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn({
Interval::new_interval(Duration::from_millis(10))
.map_err(|e| panic!("Interval error: {}", e))
.fold(tx, |tx, _| {
tx.send(Ok(())).map_err(|e| panic!("Send error: {}", e))
})
.map(drop) // discard the tx
});
let mut stats = Stats {
success: 0,
failure: 0,
};
let i = Interval::new_interval(Duration::from_millis(20))
.map_err(|e| panic!("Interval error: {}", e));
rx.zip(i).for_each(move |(res, _)| {
println!("Received");
match res {
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
}
println!("stats = {:?}", stats);
Ok(())
})
}));
}
我正在尝试了解 futures::sync::mpsc::Receiver
的工作原理。在下面的示例中,接收线程休眠两秒,发送线程每秒发送一次。
我预计发件人会因为等待而需要阻塞,然后在释放缓冲区时发送。
我看到的是一段时间后就死锁了。增加通道的缓冲区只会延长时间,直到它被阻塞。
我应该怎么做才能让发送方在缓冲区可用时发送数据,并在这种情况下向发送方施加一些背压? futures::sync::mpsc::channel
有自己的文档,但我不知道如何正确使用它。
extern crate futures;
extern crate tokio_core;
use std::{thread, time};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats {
pub success: usize,
pub failure: usize,
}
fn main() {
let mut core = Core::new().expect("Failed to create core");
let remote = core.remote();
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || loop {
let tx = tx.clone();
let delay = time::Duration::from_secs(1);
thread::sleep(delay);
let f = ::futures::done::<(), ()>(Ok(()));
remote.spawn(|_| {
f.then(|res| {
println!("Sending");
tx.send(res).wait();
println!("Sent");
Ok(())
})
});
});
let mut stats = Stats {
success: 0,
failure: 0,
};
let f2 = rx.for_each(|res| {
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res {
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
}
println!("stats = {:?}", stats);
Ok(())
});
core.run(f2).expect("Core failed to run");
}
永远不要在 future 中调用
wait
。那是阻塞,阻塞永远不应该在未来进行。永远不要在 future 中调用
sleep
。那是阻塞,阻塞永远不应该在未来进行。通道背压是由
send
consumes theSender
和 returns 一个未来实现的。当队列中有空间时,未来将Sender
返回给您 。
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::{future, sync::mpsc, Future, Sink, Stream};
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats {
pub success: usize,
pub failure: usize,
}
fn main() {
tokio::run(future::lazy(|| {
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn({
Interval::new_interval(Duration::from_millis(10))
.map_err(|e| panic!("Interval error: {}", e))
.fold(tx, |tx, _| {
tx.send(Ok(())).map_err(|e| panic!("Send error: {}", e))
})
.map(drop) // discard the tx
});
let mut stats = Stats {
success: 0,
failure: 0,
};
let i = Interval::new_interval(Duration::from_millis(20))
.map_err(|e| panic!("Interval error: {}", e));
rx.zip(i).for_each(move |(res, _)| {
println!("Received");
match res {
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
}
println!("stats = {:?}", stats);
Ok(())
})
}));
}