从顶部排出 IObservable
Drain IObservable from the top
我想消费一个可以随时填充的 IObservable。
我有这个扩展方法:
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ =>
{
}, () =>
{
queue.OnNext(new Unit());
})
);
});
}
我使用如下:
_moviesToTranslateObservable = new Subject<IMovie>();
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250)))
.Subscribe(async movieToTranslate =>
{
}
一旦推送新项目:
_moviesToTranslateObservable.OnNext(movieToTranslate);
IObservable 被消耗。
我的问题是,当我添加很多项目时,我不想使用第一个添加的项目,而是最后添加的项目(像堆栈,而不是队列)。
我怎样才能做到这一点? BehaviorSubject 是否适合堆栈消耗行为?
我知道变量名是 queue
,但是 BehaviorSubject
并不是真正的队列,它更像是一把锁。排队确实发生在 Zip
函数内部,它带有一个内部队列。
至于在 FIFO 和 LIFO 之间切换,我不确定你想要什么标准,但这是 Drain
的 FIFO 版本。
public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
var stack = new Stack<TSource>();
return source
.Do(item => stack.Push(item))
.Zip(queue, (v, q) => v)
.Select(_ => stack.Pop())
.SelectMany(v => selector(v)
.Do(_ =>
{
}, () =>
{
queue.OnNext(new Unit());
})
);
});
}
与以下 运行 代码一起使用时:
var s = new Subject<int>();
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250)));
d.Subscribe(i => Console.WriteLine(i));
s.OnNext(0);
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnNext(4);
s.OnNext(5);
正确的结果是 0, 5, 4, 3, 2, 1
我想消费一个可以随时填充的 IObservable。
我有这个扩展方法:
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ =>
{
}, () =>
{
queue.OnNext(new Unit());
})
);
});
}
我使用如下:
_moviesToTranslateObservable = new Subject<IMovie>();
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250)))
.Subscribe(async movieToTranslate =>
{
}
一旦推送新项目:
_moviesToTranslateObservable.OnNext(movieToTranslate);
IObservable 被消耗。
我的问题是,当我添加很多项目时,我不想使用第一个添加的项目,而是最后添加的项目(像堆栈,而不是队列)。
我怎样才能做到这一点? BehaviorSubject 是否适合堆栈消耗行为?
我知道变量名是 queue
,但是 BehaviorSubject
并不是真正的队列,它更像是一把锁。排队确实发生在 Zip
函数内部,它带有一个内部队列。
至于在 FIFO 和 LIFO 之间切换,我不确定你想要什么标准,但这是 Drain
的 FIFO 版本。
public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
var stack = new Stack<TSource>();
return source
.Do(item => stack.Push(item))
.Zip(queue, (v, q) => v)
.Select(_ => stack.Pop())
.SelectMany(v => selector(v)
.Do(_ =>
{
}, () =>
{
queue.OnNext(new Unit());
})
);
});
}
与以下 运行 代码一起使用时:
var s = new Subject<int>();
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250)));
d.Subscribe(i => Console.WriteLine(i));
s.OnNext(0);
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnNext(4);
s.OnNext(5);
正确的结果是 0, 5, 4, 3, 2, 1