防止 `chan::Receiver` 在空缓冲区上阻塞

Prevent `chan::Receiver` from blocking on empty buffer

我想构建一个多生产者多消费者 (MPMC) 通道,其中包含不同的并发任务处理和生产数据。其中一些任务负责与文件系统或网络接口。

两个例子:

为此,我选择 chan 作为 MPMC 频道提供者,并选择 tokio 作为系统来管理频道上每个听众的事件循环。

看完tokio's site上的例子后,我开始为chan::Receiver实现futures::stream::Stream。这将允许为每个未来使用 a 来收听频道。然而,这两个库的文档突出了一个冲突:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>

Attempt to pull out the next value of this stream, returning None if the stream is finished.

This method, like Future::poll, is the sole method of pulling out a value from a stream. This method must also be run within the context of a task typically and implementors of this trait must ensure that implementations of this method do not block, as it may cause consumers to behave badly.

fn recv(&self) -> Option<T>

Receive a value on this channel.

If this is an asnychronous channel, recv only blocks when the buffer is empty.

If this is a synchronous channel, recv only blocks when the buffer is empty.

If this is a rendezvous channel, recv blocks until a corresponding send sends a value.

For all channels, if the channel is closed and the buffer is empty, then recv always and immediately returns None. (If the buffer is non-empty on a closed channel, then values from the buffer are returned.)

Values are guaranteed to be received in the same order that they are sent.

This operation will never panic! but it can deadlock if the channel is never closed.

chan::Receiver 可能会在缓冲区为空时阻塞,但 futures::stream::Stream 预计在轮询时永远不会阻塞。

如果空缓冲区阻塞,则没有明确的方法来确认它是空的。如何检查缓冲区是否为空以防止阻塞?

虽然 Kabuki 在我的雷达上并且似乎是最成熟的 actor 模型箱,但它几乎完全缺乏文档。


这是我目前的实现:

extern crate chan;
extern crate futures;

struct RX<T>(chan::Receiver<T>);

impl<T> futures::stream::Stream for RX<T> {
    type Item = T;
    type Error = Box<std::error::Error>;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        let &mut RX(ref receiver) = self;
        let item = receiver.recv();

        match item {
            Some(value) => Ok(futures::Async::Ready(Some(value))),
            None => Ok(futures::Async::NotReady),
        }
    }
}

我已经完成了一个快速测试,看看它是如何工作的。看起来不错,但正如预期的那样,在完成缓冲区后确实会阻塞。虽然这应该有效,但我有点担心消费者 "behave badly" 意味着什么。现在我将继续测试这种方法,希望我不会遇到不良行为。

extern crate chan;
extern crate futures;
use futures::{Stream, Future};

fn my_test() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = chan::async::<String>();

    tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.

    let incoming = RX(rx).for_each(|s| {
        println!("Result: {}", s);

        Ok(())
    });

    core.run(incoming).unwrap()
}

chan crate 提供了 chan_select macro that would allow a non-blocking recv; but to implement Future for such primitives you also need to wake up the task when the channel becomes ready (see futures::task::current()).

您可以使用现有原语实现Future;实施新的通常更困难。在这种情况下,您可能需要 fork chan 以使其 Future 兼容。

似乎 multiqueue crate 有一个 Future 兼容的 mpmc 通道 mpmc_fut_queue