按时间缓冲或 运行 响应式扩展总和
Buffer by time or running sum for reactive extensions
我对 Reactive Extensions 很陌生,想根据时间或 运行 总和不超过阈值(每个项目的大小由 lambda 指定)来缓冲流,以先发生者为准,很像现有的 Buffer
按计数或时间。
目前我已经编写了自己的 Buffer
方法实现,它按预期工作,使用 IScheduler
触发超时,然后在内存中管理我自己的缓冲区并在累计总和超过了阈值,但这感觉有点低,我认为必须有一个更优雅的解决方案来以某种方式使用现有的反应操作来表达它,也许使用 Buffer
的 TBufferClosing
重载相反。
到目前为止我想出的最佳解决方案如下,但它的缺点是包含导致阈值导致总和大于请求的最大总和的最后一项:
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
var shared = source.Publish().RefCount();
return shared.Buffer(() => Observable.Amb(
Observable.Timer(timeSpan)
.Select(_ => Unit.Default),
shared.Select(sizeSelector)
.Scan((a, b) => a + b)
.SkipWhile(accumulated => accumulated < maxSize)
.Select(_ => Unit.Default))
);
}
这是否可以与现有的操作符一起工作(通过调整我上面的版本或完全以其他方式),或者我是否被迫保留我的自定义 Buffer
实现处理计时器并缓冲自己?
好的,这应该可以。迟到总比没有好。我认为没有比使用 Buffer
运算符做得更好的方法了。
问题的核心是状态机问题,这意味着您需要 Scan
解决方案。这样做的问题是,您有两个不同的来源可以改变您的状态:新项目和超时。 Scan
并不真正适用于两个多源,因此我们必须以某种方式将这两种事件类型合并为一个。
我做了 something similar before with Discriminated Unions,这个概念在这里应该行得通。首先是解决方案(使用 Nuget 包 System.Collections.Immutable
):
public static class X
{
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism
return source
.Publish(_source => _source
.Union(queue.Delay(bufferTimeSpan))
.ScanUnion(
(list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
(state, item) =>
{ // item handler
var itemSize = sizeSelector(item);
var newSize = state.size + itemSize;
if (newSize > maxSize)
{
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
}
else
return (state.list.Add(item), newSize, null);
},
(state, _) =>
{ // time out handler
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty, 0, state.list);
}
)
.Where(t => t.emitValue != null)
.Select(t => t.emitValue.ToList())
.TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
);
}
}
说明:Union
将两个不同类型的流合并为一个流,其中项目可以是类型 A 或类型 B。ScanUnion
的工作方式与 Scan
相同,但是提供两个函数来处理两种不同类型的项目。
只要新缓冲区 window 打开,就会命中 BehaviorSubject
,Delay
运算符确保 Scan
在定义的时间跨度后获取它。 Scan
中的状态包含当前缓冲项的列表,'size' 中的状态。 emitValue
在缓冲区 window 关闭时使用,并传递值。
这是 Discriminated Union 帮助程序代码:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}
用函数式编程解决这个特殊问题可能会变得相当复杂。命令式方法更易于编码,并且更容易处理极端情况恕我直言。
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
Func<TSource, int> sizeSelector,
int maxSize, TimeSpan timeSpan)
{
return Observable.Create<IList<TSource>>(observer =>
{
var locker = new object();
var buffer = new List<TSource>();
int sumSize = 0;
Timer timer = null;
void EmitBuffer()
{
Debug.Assert(Monitor.IsEntered(locker));
timer?.Dispose();
observer.OnNext(buffer);
buffer = new List<TSource>();
sumSize = 0;
timer = new Timer(OnTick, buffer, timeSpan, Timeout.InfiniteTimeSpan);
}
void OnTick(object state)
{
// Only emit in case the timer is associated with the current buffer
lock (locker) if (ReferenceEquals(state, buffer)) EmitBuffer();
}
timer = new Timer(OnTick, buffer, timeSpan, Timeout.InfiniteTimeSpan);
return source.Subscribe(value =>
{
// It is possible to emit two buffers at once
lock (locker)
{
var size = sizeSelector(value);
if (sumSize + size > maxSize && buffer.Count > 0) EmitBuffer();
buffer.Add(value);
sumSize += size;
if (sumSize >= maxSize) EmitBuffer();
}
}, exception =>
{
lock (locker)
{
timer?.Dispose();
buffer = null; // Cleanup
observer.OnError(exception);
}
}, () =>
{
lock (locker)
{
timer?.Dispose();
if (buffer.Count > 0) observer.OnNext(buffer);
buffer = null; // Cleanup
observer.OnCompleted();
}
});
});
}
注意: 此实现假定 sizeSelector
委托从不抛出。如果不是这种情况,您需要将 Subscribe
调用替换为找到 here 的自定义 SubscribeSafe
扩展方法。否则 sizeSelector
抛出的异常将得不到处理并导致进程崩溃。
此外,如果您决定使用 IScheduler scheduler
参数增强此方法,请注意不要关闭 buffer
,因为引用是可变的。这将导致错误:scheduler.Schedule(() => observer.OnNext(buffer))
。而是使用引用的本地副本,或使用可选的 TState state
参数。
我对 Reactive Extensions 很陌生,想根据时间或 运行 总和不超过阈值(每个项目的大小由 lambda 指定)来缓冲流,以先发生者为准,很像现有的 Buffer
按计数或时间。
目前我已经编写了自己的 Buffer
方法实现,它按预期工作,使用 IScheduler
触发超时,然后在内存中管理我自己的缓冲区并在累计总和超过了阈值,但这感觉有点低,我认为必须有一个更优雅的解决方案来以某种方式使用现有的反应操作来表达它,也许使用 Buffer
的 TBufferClosing
重载相反。
到目前为止我想出的最佳解决方案如下,但它的缺点是包含导致阈值导致总和大于请求的最大总和的最后一项:
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
var shared = source.Publish().RefCount();
return shared.Buffer(() => Observable.Amb(
Observable.Timer(timeSpan)
.Select(_ => Unit.Default),
shared.Select(sizeSelector)
.Scan((a, b) => a + b)
.SkipWhile(accumulated => accumulated < maxSize)
.Select(_ => Unit.Default))
);
}
这是否可以与现有的操作符一起工作(通过调整我上面的版本或完全以其他方式),或者我是否被迫保留我的自定义 Buffer
实现处理计时器并缓冲自己?
好的,这应该可以。迟到总比没有好。我认为没有比使用 Buffer
运算符做得更好的方法了。
问题的核心是状态机问题,这意味着您需要 Scan
解决方案。这样做的问题是,您有两个不同的来源可以改变您的状态:新项目和超时。 Scan
并不真正适用于两个多源,因此我们必须以某种方式将这两种事件类型合并为一个。
我做了 something similar before with Discriminated Unions,这个概念在这里应该行得通。首先是解决方案(使用 Nuget 包 System.Collections.Immutable
):
public static class X
{
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism
return source
.Publish(_source => _source
.Union(queue.Delay(bufferTimeSpan))
.ScanUnion(
(list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
(state, item) =>
{ // item handler
var itemSize = sizeSelector(item);
var newSize = state.size + itemSize;
if (newSize > maxSize)
{
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
}
else
return (state.list.Add(item), newSize, null);
},
(state, _) =>
{ // time out handler
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty, 0, state.list);
}
)
.Where(t => t.emitValue != null)
.Select(t => t.emitValue.ToList())
.TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
);
}
}
说明:Union
将两个不同类型的流合并为一个流,其中项目可以是类型 A 或类型 B。ScanUnion
的工作方式与 Scan
相同,但是提供两个函数来处理两种不同类型的项目。
只要新缓冲区 window 打开,就会命中 BehaviorSubject
,Delay
运算符确保 Scan
在定义的时间跨度后获取它。 Scan
中的状态包含当前缓冲项的列表,'size' 中的状态。 emitValue
在缓冲区 window 关闭时使用,并传递值。
这是 Discriminated Union 帮助程序代码:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}
用函数式编程解决这个特殊问题可能会变得相当复杂。命令式方法更易于编码,并且更容易处理极端情况恕我直言。
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
Func<TSource, int> sizeSelector,
int maxSize, TimeSpan timeSpan)
{
return Observable.Create<IList<TSource>>(observer =>
{
var locker = new object();
var buffer = new List<TSource>();
int sumSize = 0;
Timer timer = null;
void EmitBuffer()
{
Debug.Assert(Monitor.IsEntered(locker));
timer?.Dispose();
observer.OnNext(buffer);
buffer = new List<TSource>();
sumSize = 0;
timer = new Timer(OnTick, buffer, timeSpan, Timeout.InfiniteTimeSpan);
}
void OnTick(object state)
{
// Only emit in case the timer is associated with the current buffer
lock (locker) if (ReferenceEquals(state, buffer)) EmitBuffer();
}
timer = new Timer(OnTick, buffer, timeSpan, Timeout.InfiniteTimeSpan);
return source.Subscribe(value =>
{
// It is possible to emit two buffers at once
lock (locker)
{
var size = sizeSelector(value);
if (sumSize + size > maxSize && buffer.Count > 0) EmitBuffer();
buffer.Add(value);
sumSize += size;
if (sumSize >= maxSize) EmitBuffer();
}
}, exception =>
{
lock (locker)
{
timer?.Dispose();
buffer = null; // Cleanup
observer.OnError(exception);
}
}, () =>
{
lock (locker)
{
timer?.Dispose();
if (buffer.Count > 0) observer.OnNext(buffer);
buffer = null; // Cleanup
observer.OnCompleted();
}
});
});
}
注意: 此实现假定 sizeSelector
委托从不抛出。如果不是这种情况,您需要将 Subscribe
调用替换为找到 here 的自定义 SubscribeSafe
扩展方法。否则 sizeSelector
抛出的异常将得不到处理并导致进程崩溃。
此外,如果您决定使用 IScheduler scheduler
参数增强此方法,请注意不要关闭 buffer
,因为引用是可变的。这将导致错误:scheduler.Schedule(() => observer.OnNext(buffer))
。而是使用引用的本地副本,或使用可选的 TState state
参数。