Rx.Net Window 功能扩展以支持计数、时间和大小
Rx.Net Window function extension to support count, time AND size
实际上我有一个 IObservable<byte[]>
并且想要 window 它不仅在数组计数和时间范围上而且在通过当前 window 的数组的整体大小上。
请提供这些技巧
这是一个自定义 Window
运算符,它试图复制内置 Window
重载的功能,该重载接受 timeSpan
和 count
参数,同时还支持window 整体尺寸。
public static IObservable<IObservable<TSource>> Window<TSource>(
this IObservable<TSource> source, TimeSpan timeSpan, int count,
int size, Func<TSource, int> sizeSelector, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero && timeSpan != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
if (size < 0) throw new ArgumentOutOfRangeException(nameof(size));
if (sizeSelector == null) throw new ArgumentNullException(nameof(sizeSelector));
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<IObservable<TSource>>(observer =>
{
Subject<TSource> currentSubject = null;
IStopwatch stopwatch = null;
int itemCounter = 0;
int currentSize = 0;
return source.Subscribe(item =>
{
if (currentSubject == null)
{
currentSubject = new Subject<TSource>();
observer.OnNext(currentSubject);
}
if (stopwatch == null && timeSpan != Timeout.InfiniteTimeSpan)
{
stopwatch = scheduler.StartStopwatch();
}
currentSubject.OnNext(item);
itemCounter++;
currentSize += sizeSelector(item);
if (itemCounter == count
|| (stopwatch != null && stopwatch.Elapsed >= timeSpan)
|| currentSize >= size)
{
currentSubject.OnCompleted();
currentSubject = null;
if (stopwatch != null) stopwatch = scheduler.StartStopwatch();
itemCounter = 0;
currentSize = 0;
}
}, ex =>
{
if (currentSubject != null) currentSubject.OnError(ex);
observer.OnError(ex);
}, () =>
{
if (currentSubject != null) currentSubject.OnCompleted();
observer.OnCompleted();
});
});
}
用法示例:
observable.Window(timeSpan: TimeSpan.FromMilliseconds(5000), count: 10,
size: 100, sizeSelector: x => x.Length);
每次当前 window 已满,或者给定的时间已经过去,或者累积的大小变得大于给定的大小时,都会创建一个新的 window。每次发出 window 时都会重新启动计时器。
实际上我有一个 IObservable<byte[]>
并且想要 window 它不仅在数组计数和时间范围上而且在通过当前 window 的数组的整体大小上。
请提供这些技巧
这是一个自定义 Window
运算符,它试图复制内置 Window
重载的功能,该重载接受 timeSpan
和 count
参数,同时还支持window 整体尺寸。
public static IObservable<IObservable<TSource>> Window<TSource>(
this IObservable<TSource> source, TimeSpan timeSpan, int count,
int size, Func<TSource, int> sizeSelector, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero && timeSpan != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
if (size < 0) throw new ArgumentOutOfRangeException(nameof(size));
if (sizeSelector == null) throw new ArgumentNullException(nameof(sizeSelector));
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<IObservable<TSource>>(observer =>
{
Subject<TSource> currentSubject = null;
IStopwatch stopwatch = null;
int itemCounter = 0;
int currentSize = 0;
return source.Subscribe(item =>
{
if (currentSubject == null)
{
currentSubject = new Subject<TSource>();
observer.OnNext(currentSubject);
}
if (stopwatch == null && timeSpan != Timeout.InfiniteTimeSpan)
{
stopwatch = scheduler.StartStopwatch();
}
currentSubject.OnNext(item);
itemCounter++;
currentSize += sizeSelector(item);
if (itemCounter == count
|| (stopwatch != null && stopwatch.Elapsed >= timeSpan)
|| currentSize >= size)
{
currentSubject.OnCompleted();
currentSubject = null;
if (stopwatch != null) stopwatch = scheduler.StartStopwatch();
itemCounter = 0;
currentSize = 0;
}
}, ex =>
{
if (currentSubject != null) currentSubject.OnError(ex);
observer.OnError(ex);
}, () =>
{
if (currentSubject != null) currentSubject.OnCompleted();
observer.OnCompleted();
});
});
}
用法示例:
observable.Window(timeSpan: TimeSpan.FromMilliseconds(5000), count: 10,
size: 100, sizeSelector: x => x.Length);
每次当前 window 已满,或者给定的时间已经过去,或者累积的大小变得大于给定的大小时,都会创建一个新的 window。每次发出 window 时都会重新启动计时器。