如何合并一个嵌套的 observable IObservable<IObservable<T>> 并发受限,缓冲容量受限?

How to merge a nested observable IObservable<IObservable<T>> with limited concurrency and limited buffer capacity?

我注意到 Rx Merge 运算符接受一个可选的 maxConcurrent 参数。这可用于通过同时订阅有限数量的子序列来限制最大并发性。当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但当新子序列的推送速度比订阅子序列的完成速度快时,它就会出现问题。发生的情况是子序列被缓冲在一个内部缓冲区中,其大小不断增加,而且当前订阅的子序列变得越来越旧。下面是这个问题的演示:

await Observable
    .Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
    .Select(_ => Observable
        .Return(DateTime.Now)
        .Do(d => Console.WriteLine(
            $"Then: {d:HH:mm:ss.fff}, " +
            $"Now: {DateTime.Now:HH:mm:ss.fff}, " +
            $"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
        .Delay(TimeSpan.FromMilliseconds(1000)))
    .Merge(maxConcurrent: 1)
    .Take(10);

每10毫秒推送一个新的子序列,每个子序列在1000毫秒后完成。子序列以最大并发1合并(顺序)。

输出:

Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes

(Try it on Fiddle)

内存使用量稳步增长,每个子序列的创建和订阅之间的时间间隔也在增长。

我想要的是一个自定义 Merge 变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入的子序列都应该导致当前最旧的缓冲子序列被丢弃。这是理想行为的大理石图,配置为最大并发 = 1 和缓冲区容量 = 1:

Source: +----A------B------C------|
A:           +-------a----a---|
B:                  not-subscribed
C:                            +-----c----|
Result: +------------a----a---------c----|

如何实现具有这种特定行为的自定义 Rx 运算符?这是我要实现的运算符的存根:

public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency,
    int boundedCapacity)
{
    return source.Merge(maximumConcurrency);
    // TODO: enforce the boundedCapacity policy somehow
}

我想出了一个实用的解决方案,我不确定它是否可行,只是因为它很复杂。但我想我涵盖了所有基础。

首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前有多少可观察对象正在执行以及缓冲区队列。可以影响状态的两个事件是一个新的 Observable 进入缓冲队列(导致缓冲队列入队),或者当前正在执行的 observable 终止(导致缓冲队列出队)。

由于状态机基本上意味着 Scan,而 Scan 只能用于一种类型,我们必须将我们的两个事件强制转换为一种类型,我称之为 Message 以下。然后状态机知道所有并可以完成 Merge(n) 重载的工作。

最后一个技巧是回环:由于完成的 Observable 是 Scan 中的 'downstream',我们需要 'loop-back' 将那个 Observable 终止到 Scan .为此,我总是参考 [this answer][1].

中的 Drain 函数
public static class X
{
    public static IObservable<T> MergeBounded<T>(
        this IObservable<IObservable<T>> source,
        int maximumConcurrency,
        int boundedCapacity)
    {
        return Observable.Defer(() =>
        {
            var capacityQueue = new Subject<Unit>();

            var toReturn = source.Publish(_source => _source
                .Select(o => Message.Enqueue(o))
                .Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
                .Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
                {
                    var buffer = state.buffer;
                    var bufferCount = state.bufferCount;
                    var executionCount = state.executionCount;
                    if (message.IsEnqueue)
                    {
                        if (executionCount < maximumConcurrency)
                            return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);

                        buffer = buffer.Enqueue(message.Object);
                        if (bufferCount == boundedCapacity)
                            buffer = buffer.Dequeue();
                        else
                            bufferCount++;
                        return (bufferCount, buffer, executionCount, null);
                    }
                    else
                    {
                        if (bufferCount == 0)
                            return (0, buffer, executionCount - 1, null);
                        else
                            return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
                    }
                })
                .Where(t => t.item != null)
                .Select(t => t.item)
                .Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
                .TakeUntil(_source.IgnoreElements().Materialize())
                .Merge()
            );

            return toReturn;
        });

    }

    public class Message
    {
        public static Message<T> Enqueue<T>(T t)
        {
            return Message<T>.Enqueue(t);
        }

        public static Message<T> Dequeue<T>(T t)
        {
            return Message<T>.Dequeue(t);
        }

    }

    public class Message<T>
    {
        private readonly T _t;
        private readonly bool _isEnqueue;
        private Message(bool isEnqueue, T t)
        {
            _t = t;
            _isEnqueue = isEnqueue;
        }
        
        public static Message<T> Enqueue(T t)
        {
            return new Message<T>(true, t);
        }

        public static Message<T> Dequeue(T t)
        {
            return new Message<T>(false, t);
        }
        
        public bool IsEnqueue => _isEnqueue;
        public T Object => _t;
    }
}

我写了一些测试代码(基于原始问题)来验证,如果你想借用它。测试正在通过:

//              T: 0123456789012345678901234567890123
//            T10: 0         1         2         3
//         Source: +----A------B------C------|
//              A:      +-------a----a---|
//              B:             +----------b----b---|
//              C:                    +--------c----|
// ExpectedResult: +------------a----a---------c----|


var ts = new TestScheduler();

var A = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
    ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
    ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
    ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
    ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
    ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
    ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
    ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
    ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual");   // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);

(测试代码无异常通过)

这是另一种实现方式。它不像 Shlomo 的 那样功能完备,因为它不能用 boundedCapacity: 0 配置。内部缓冲区的大小必须至少为 1。

/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, limiting the number of concurrent subscriptions to inner sequences.
/// The unsubscribed inner sequences are stored in a buffer with the specified
/// maximum capacity. When the buffer is full, the oldest inner sequence in the
/// buffer is dropped and ignored in order to make room for the latest inner
/// sequence.
/// </summary>
public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency, int boundedCapacity)
{
    if (boundedCapacity < 1)
        throw new ArgumentOutOfRangeException(nameof(boundedCapacity));

    return Observable.Defer(() =>
    {
        var queue = new Queue<IObservable<T>>(boundedCapacity);
        return source
            .Select(inner =>
            {
                bool oldestDropped = false;
                lock (queue)
                {
                    if (queue.Count == boundedCapacity)
                    {
                        queue.Dequeue(); oldestDropped = true;
                    }
                    queue.Enqueue(inner);
                }
                if (oldestDropped) return null;
                return Observable.Defer(() =>
                {
                    lock (queue) return queue.Dequeue();
                });
            })
            .Where(inner => inner != null)
            .Merge(maximumConcurrency);
    });
}

此实现基于以下假设:内置 Merge 运算符从不订阅同一子序列两次。否则语句 queue.Dequeue() 可能会在空的 queue 上调用,并导致异常。