如何关闭修改和执行的`futures::sync::mpsc::Receiver` 流?

How to close a modified and executing `futures::sync::mpsc::Receiver` stream?

我希望能够按照这些思路做一些事情,以便异步关闭 Receiver 流:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::stream::AndThen;
use futures::sync::mpsc::Receiver;
use futures::{Future, Sink, Stream};
use std::sync::{Arc, Mutex};

use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let arc = Arc::new(Mutex::<Option<AndThen<Receiver<u32>, _, _>>>::new(None));

        {
            let mut and_then = arc.lock().unwrap();
            *and_then = Some(rx.and_then(|num| {
                println!("{}", num);
                Ok(())
            }));
        }

        let arc_clone = arc.clone();
        // This is the part I'd like to be able to do
        // After one second, close the `Receiver` so that future
        // calls to the `Sender` don't call the callback above in the
        // closure passed to `rx.and_then`
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    let mut maybe_stream = arc_clone.lock().unwrap();
                    match maybe_stream.take() {
                        Some(stream) => stream.into_inner().close(),
                        None => eprintln!("Can't close non-existent stream"), // line "A"
                    }
                    Ok(())
                }),
        );

        {
            let mut maybe_stream = arc.lock().unwrap();
            let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

            let rx = stream.for_each(|_| Ok(()));
            tokio::spawn(rx);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(10))
                .take(10)
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Playground

但是,A 行运行,因为我必须移动 B 行上的流才能在其上调用 .for_each。据我所知,如果我不调用 .for_each(或类似的东西),我根本无法执行 AndThen。我不能在不实际移动对象的情况下调用 .for_each,因为 for_each 是一种移动方法。

我可以做我想做的事吗?这似乎绝对是可能的,但也许我遗漏了一些明显的东西。

我使用 0.1 的期货和 0.1 的 tokio。

您可以使用像 stream-cancel to achieve this. Here, I've used the Valved 流包装器这样的板条箱,它接受一个现有的流和 returns 一个您以后可以用来取消流的值:

use futures::{
    future::lazy,
    {Future, Sink, Stream},
}; // 0.1.25
use stream_cancel::Valved; // 0.4.4
use tokio::timer::{Delay, Interval}; // 0.1.13

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);
        let (trigger, rx) = Valved::new(rx);

        tokio::spawn({
            rx.for_each(|num| {
                println!("{}", num);
                Ok(())
            })
        });

        tokio::spawn({
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .map(move |_| trigger.cancel()),
        });

        tokio::spawn({
            Interval::new_interval(std::time::Duration::from_millis(10))
                .take(10)
                .map_err(|e| eprintln!("Interval error?! {:?}", e))
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        });

        Ok(())
    }));
}

箱子还有其他类型适用于略有不同的用例,请务必查看文档。

请参阅 了解一种自己实现的方法。

不会撒谎,我和@shepmaster 一起解决这个问题,你的问题很不清楚。也就是说,感觉 就像您正在尝试做一些 futuresmpsc 部分不适合做的事情。

总之。讲解时间。

每当你 combine/compose 流(或期货!)时,每个组合方法都需要 self,而不是我认为你可能希望的 &self&mut self

当您到达您的代码块时:

    {
        let mut maybe_stream = arc.lock().unwrap();
        let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

        let rx = stream.for_each(|_| Ok(()));
        tokio::spawn(rx);
    }

...当您 take() 时,流从 Arc<Option<Receiver<T>>> 中提取,其内容被 None 替换。然后在 Tokio 反应器上生成它,它开始处理这部分。此 rx 现在处于循环状态,您无法再使用。此外,您的 maybe_stream 现在包含 None.

延迟一段时间后,您尝试 take() Arc<Option<Receiver<T>>> 的内容(A 行)。因为现在什么都没有了,你也什么都没有了,因此也没有什么可以关闭的了。您的代码出错了。

与其传递 mpsc::Receiver 并希望销毁它,不如使用一种机制来停止流本身。您可以自己这样做,也可以使用像 stream-cancel 这样的箱子来为您这样做。

DIY 版本在这里,根据您的代码修改:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
        let c_b_copy = Arc::clone(&circuit_breaker);
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    // We set the CB to true in order to stop processing of the stream
                    circuit_breaker.store(true, Ordering::Relaxed);
                    Ok(())
                }),
        );

        {
            let rx2 = rx.for_each(|e| {
                println!("{:?}", e);
                Ok(())
            });
            tokio::spawn(rx2);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(100))
                .take(100)
                // take_while causes the stream to continue as long as its argument returns a future resolving to true.
                // In this case, we're checking every time if the circuit-breaker we've introduced is false
                .take_while(move |_| {
                    future::ok(
                        c_b_copy.load(Ordering::Relaxed) == false
                    );
                })
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Playground

添加的 take_while() 允许您对流的内容或外部谓词进行操作以继续或停止流。请注意,即使我们使用 AtomicBool,由于 Tokio 的 'static 生命周期要求,我们仍然需要 Arc

逆流

在评论中进行一些讨论后,this solution 可能更适合您的用例。我有效地实现了一个由断路器覆盖的扇出流。奇迹发生在这里:

impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {

    type Item = S::Item;

    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
        match self.inner.as_mut() {
            Some(ref mut r) => {
                let mut breaker = self.breaker.write().expect("Poisoned lock");
                match breaker.status {
                    false => {
                        let item = r.poll();
                        match &item {
                            &Ok(Async::Ready(Some(ref i))) => {
                                breaker.registry.iter_mut().for_each(|sender| {
                                    sender.try_send(i.clone()).expect("Dead channel");
                                });
                                item
                            },
                            _ => item
                        }
                    },
                    true => Ok(Async::Ready(None))
                }
            }
            _ => {

                let mut breaker = self.breaker.write().expect("Poisoned lock");
                // Stream is over, drop all the senders

                breaker.registry = vec![];
                Ok(Async::Ready(None))
            }
        }
    }
}

如果状态指示器设置为false,则轮询上述流;然后将结果发送给所有听众。如果poll的结果是Async::Ready(None)(表示流结束),则关闭所有监听通道。

如果状态指示器设置为 true,所有侦听器通道都将关闭,并且流 returns Async::Ready(None)(并被 Tokio 从执行中删除)。

FanOut 对象是可克隆的,但只有初始实例可以执行任何操作。