Rx.Net Window 功能扩展以支持计数、时间和大小

Rx.Net Window function extension to support count, time AND size

实际上我有一个 IObservable<byte[]> 并且想要 window 它不仅在数组计数和时间范围上而且在通过当前 window 的数组的整体大小上。

请提供这些技巧

这是一个自定义 Window 运算符,它试图复制内置 Window 重载的功能,该重载接受 timeSpancount 参数,同时还支持window 整体尺寸。

public static IObservable<IObservable<TSource>> Window<TSource>(
    this IObservable<TSource> source, TimeSpan timeSpan, int count,
    int size, Func<TSource, int> sizeSelector, IScheduler scheduler = null)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (timeSpan < TimeSpan.Zero && timeSpan != Timeout.InfiniteTimeSpan)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
    if (size < 0) throw new ArgumentOutOfRangeException(nameof(size));
    if (sizeSelector == null) throw new ArgumentNullException(nameof(sizeSelector));
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<IObservable<TSource>>(observer =>
    {
        Subject<TSource> currentSubject = null;
        IStopwatch stopwatch = null;
        int itemCounter = 0;
        int currentSize = 0;

        return source.Subscribe(item =>
        {
            if (currentSubject == null)
            {
                currentSubject = new Subject<TSource>();
                observer.OnNext(currentSubject);
            }
            if (stopwatch == null && timeSpan != Timeout.InfiniteTimeSpan)
            {
                stopwatch = scheduler.StartStopwatch();
            }

            currentSubject.OnNext(item);
            itemCounter++;
            currentSize += sizeSelector(item);

            if (itemCounter == count
                || (stopwatch != null && stopwatch.Elapsed >= timeSpan)
                || currentSize >= size)
            {
                currentSubject.OnCompleted();
                currentSubject = null;
                if (stopwatch != null) stopwatch = scheduler.StartStopwatch();
                itemCounter = 0;
                currentSize = 0;
            }
        }, ex =>
        {
            if (currentSubject != null) currentSubject.OnError(ex);
            observer.OnError(ex);
        }, () =>
        {
            if (currentSubject != null) currentSubject.OnCompleted();
            observer.OnCompleted();
        });
    });
}

用法示例:

observable.Window(timeSpan: TimeSpan.FromMilliseconds(5000), count: 10,
    size: 100, sizeSelector: x => x.Length);

每次当前 window 已满,或者给定的时间已经过去,或者累积的大小变得大于给定的大小时,都会创建一个新的 window。每次发出 window 时都会重新启动计时器。