强制刷新计数类型 Observable.Buffer c#

Force flush count-type Observable.Buffer c#

基于这个讨论刷新基于时间的缓冲区的问题: Force flush to Observable.Buffer c#, I'm having difficulty working out how to translate this 那里给出了我按计数而不是按时间缓冲的情况的答案:

var subject = new Subject<Unit>();
var closing = Observable
    .Timer(new TimeSpan(0, 0, 1, 30))
    .Select(x => Unit.Default);

var query =
    mFluxObservable
        .Buffer(() => Observable
            .Amb(subject, closing)
            .Take(1));

我开始使用相同的 Amb 逻辑,使用 'item counter' 而不是计时器,但发现自己陷入了试图弄清楚如何重置 .

你能轻轻地推动我解决如何实现我缺少的功能吗?

var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);

我的来源是 'hot',如果有帮助...

PS:我可以使用 Observable.Create 和某种内部计数器来解决问题,但不是没有锁定...

我认为您可以通过在关闭可观察对象中使用源并将其与刷新可观察对象合并来实现。以下对我有用:

 var source = new Subject<Unit>();
 var flush = new Subject<Unit>();

 // close buffer every 3 values or when a flush value arrives
 var closing = source.Buffer(3) 
            .Select(x => Unit.Default)
            .Merge(flush);

 var query = source.Buffer(() => closing)
         .Subscribe(Console.WriteLine);

// some test values
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);

// flush buffer
flush.OnNext(Unit.Default);

这是我目前得到的:

var flush = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => Unit.Default).Publish().RefCount();
var closer = CloseGenerator(source, flush, 5);
source.Buffer(closer)

//...

private IObservable<Unit> CloseGenerator<T>(IObservable<T> source, 
                                             IObservable<Unit> flusher, int count)
{
     return Observable.CombineLatest(
                        source.Select((_, i) => i), 
                        flusher.Select((_, i) => i).StartWith(-1))
             .Select(ar => Tuple.Create(ar[0], ar[1]))
             .Scan(Tuple.Create(-1, -1), (prev, next) =>
                 {
                     if(next.Item2 != prev.Item2 || next.Item1 == prev.Item1 + count)
                         return next;
                     else
                         return prev;
                 }
             )
             .DistinctUntilChanged().Skip(1) //This is 'DistinctExceptFirst'
             .Select(_ => Unit.Default);
}

我认为 Observable.Create<T> 解决方案没有任何问题。在这种情况下,我认为这个扩展应该有效

public static IObservable<IList<T>> BufferExceptOnFlush<T>(this IObservable<T> source,IObservable<Unit> flusher, int bufferSize)
{
 return Observable.Create<IList<T>>(observer =>
 {
     var shared = source.Publish();
     var closing = shared.Buffer(bufferSize).Select(x => Unit.Default);
     var query = shared.Buffer(() => flusher.Amb(closing).Take(1)).SubscribeSafe(observer);
     return new CompositeDisposable(query, shared.Connect());
 });

我还没有测试过,但会启用这样的用法

var query = myFluxObservable.BufferExceptOnFlush(myFlusher, 5);

由于没有双重订阅,它对热和冷 observable 都有效