从顶部排出 IObservable

Drain IObservable from the top

我想消费一个可以随时填充的 IObservable。

我有这个扩展方法:

public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => selector(v)
                .Do(_ =>
                {

                }, () =>
                {
                    queue.OnNext(new Unit());
                })
            );
    });
}

我使用如下:

_moviesToTranslateObservable = new Subject<IMovie>();
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250)))
  .Subscribe(async movieToTranslate =>
      {
      }

一旦推送新项目:

_moviesToTranslateObservable.OnNext(movieToTranslate);

IObservable 被消耗。

我的问题是,当我添加很多项目时,我不想使用第一个添加的项目,而是最后添加的项目(像堆栈,而不是队列)。

我怎样才能做到这一点? BehaviorSubject 是否适合堆栈消耗行为?

我知道变量名是 queue,但是 BehaviorSubject 并不是真正的队列,它更像是一把锁。排队确实发生在 Zip 函数内部,它带有一个内部队列。

至于在 FIFO 和 LIFO 之间切换,我不确定你想要什么标准,但这是 Drain 的 FIFO 版本。

public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        var stack = new Stack<TSource>();

        return source
            .Do(item => stack.Push(item))
            .Zip(queue, (v, q) => v)
            .Select(_ => stack.Pop())
            .SelectMany(v => selector(v)
                .Do(_ =>
                {

                }, () =>
                {
                    queue.OnNext(new Unit());
                })
            );
    });
}

与以下 运行 代码一起使用时:

var s = new Subject<int>();
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250)));
d.Subscribe(i => Console.WriteLine(i));
s.OnNext(0);
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnNext(4);
s.OnNext(5);

正确的结果是 0, 5, 4, 3, 2, 1