按时间缓冲或 运行 响应式扩展总和

Buffer by time or running sum for reactive extensions

我对 Reactive Extensions 很陌生,想根据时间或 运行 总和不超过阈值(每个项目的大小由 lambda 指定)来缓冲流,以先发生者为准,很像现有的 Buffer 按计数或时间。

目前我已经编写了自己的 Buffer 方法实现,它按预期工作,使用 IScheduler 触发超时,然后在内存中管理我自己的缓冲区并在累计总和超过了阈值,但这感觉有点低,我认为必须有一个更优雅的解决方案来以某种方式使用现有的反应操作来表达它,也许使用 BufferTBufferClosing 重载相反。

到目前为止我想出的最佳解决方案如下,但它的缺点是包含导致阈值导致总和大于请求的最大总和的最后一项:

    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan) 
    {
        var shared = source.Publish().RefCount();
    
        return shared.Buffer(() => Observable.Amb(
            Observable.Timer(timeSpan)
                .Select(_ => Unit.Default),
            shared.Select(sizeSelector)
                .Scan((a, b) => a + b)
                .SkipWhile(accumulated => accumulated < maxSize)
                .Select(_ => Unit.Default))
            );
    }

这是否可以与现有的操作符一起工作(通过调整我上面的版本或完全以其他方式),或者我是否被迫保留我的自定义 Buffer 实现处理计时器并缓冲自己?

好的,这应该可以。迟到总比没有好。我认为没有比使用 Buffer 运算符做得更好的方法了。

问题的核心是状态机问题,这意味着您需要 Scan 解决方案。这样做的问题是,您有两个不同的来源可以改变您的状态:新项目和超时。 Scan 并不真正适用于两个多源,因此我们必须以某种方式将这两种事件类型合并为一个。

我做了 something similar before with Discriminated Unions,这个概念在这里应该行得通。首先是解决方案(使用 Nuget 包 System.Collections.Immutable):

public static class X
{
    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism

        return source
            .Publish(_source => _source
                .Union(queue.Delay(bufferTimeSpan))
                .ScanUnion(
                    (list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
                    (state, item) =>
                    { // item handler
                        var itemSize = sizeSelector(item);
                        var newSize = state.size + itemSize;
                        if (newSize > maxSize)
                        {
                            queue.OnNext(Unit.Default);
                            return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
                        }
                        else
                            return (state.list.Add(item), newSize, null);
                    },
                    (state, _) =>
                    { // time out handler
                        queue.OnNext(Unit.Default);
                        return (ImmutableList<TSource>.Empty, 0, state.list);
                    }
                )
                .Where(t => t.emitValue != null)
                .Select(t => t.emitValue.ToList())
                .TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
        );
    }
}

说明:Union将两个不同类型的流合并为一个流,其中项目可以是类型 A 或类型 B。ScanUnion 的工作方式与 Scan 相同,但是提供两个函数来处理两种不同类型的项目。

只要新缓冲区 window 打开,就会命中 BehaviorSubjectDelay 运算符确保 Scan 在定义的时间跨度后获取它。 Scan 中的状态包含当前缓冲项的列表,'size' 中的状态。 emitValue 在缓冲区 window 关闭时使用,并传递值。

这是 Discriminated Union 帮助程序代码:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
    {
        return source.Scan(initialState, (state, u) => u.IsType1
            ? type1Handler(state, u.Type1Item)
            : type2Handler(state, u.Type2Item)
        );
    }
}

用函数式编程解决这个特殊问题可能会变得相当复杂。命令式方法更易于编码,并且更容易处理极端情况恕我直言。

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> sizeSelector,
    int maxSize, TimeSpan timeSpan)
{
    return Observable.Create<IList<TSource>>(observer =>
    {
        var locker = new object();
        var buffer = new List<TSource>();
        int sumSize = 0;
        Timer timer = null;

        void EmitBuffer()
        {
            Debug.Assert(Monitor.IsEntered(locker));
            timer?.Dispose();
            observer.OnNext(buffer);
            buffer = new List<TSource>();
            sumSize = 0;
            timer = new Timer(OnTick, buffer, timeSpan, Timeout.InfiniteTimeSpan);
        }

        void OnTick(object state)
        {
            // Only emit in case the timer is associated with the current buffer
            lock (locker) if (ReferenceEquals(state, buffer)) EmitBuffer();
        }

        timer = new Timer(OnTick, buffer, timeSpan, Timeout.InfiniteTimeSpan);

        return source.Subscribe(value =>
        {
            // It is possible to emit two buffers at once
            lock (locker)
            {
                var size = sizeSelector(value);
                if (sumSize + size > maxSize && buffer.Count > 0) EmitBuffer();
                buffer.Add(value);
                sumSize += size;
                if (sumSize >= maxSize) EmitBuffer();
            }
        }, exception =>
        {
            lock (locker)
            {
                timer?.Dispose();
                buffer = null; // Cleanup
                observer.OnError(exception);
            }
        }, () =>
        {
            lock (locker)
            {
                timer?.Dispose();
                if (buffer.Count > 0) observer.OnNext(buffer);
                buffer = null; // Cleanup
                observer.OnCompleted();
            }
        });
    });
}

注意: 此实现假定 sizeSelector 委托从不抛出。如果不是这种情况,您需要将 Subscribe 调用替换为找到 here 的自定义 SubscribeSafe 扩展方法。否则 sizeSelector 抛出的异常将得不到处理并导致进程崩溃。

此外,如果您决定使用 IScheduler scheduler 参数增强此方法,请注意不要关闭 buffer,因为引用是可变的。这将导致错误:scheduler.Schedule(() => observer.OnNext(buffer))。而是使用引用的本地副本,或使用可选的 TState state 参数。