如何合并多个具有顺序保留和最大并发性的可观察对象?

How to merge multiple observables with order preservation and maximum concurrency?

我搜索了一个副本,但没有找到。我有一个嵌套的可观察对象 IObservable<IObservable<T>>,我想将它展平为 IObservable<T>。我不想使用 Concat operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T values immediately after they are emitted by the outer observable. I also don't want to use the Merge 运算符,因为它会打乱发出值的顺序。下面的大理石图显示了 Merge 运算符的有问题的(对于我的情况)行为,以及理想的合并行为。

Stream of observables: +----1------2-----3----|
Observable-1         :      +--A-----------------B-------|
Observable-2         :             +---C---------------------D------|
Observable-3         :                   +--E--------------------F-------|
Merge (undesirable)  : +-------A-------C----E----B-----------D---F-------|
Desirable merging    : +-------A-----------------B-------C---D------EF---|

Observable-1 发出的所有值都应先于 Observable-2 发出的任何值。 Observable-2 和 Observable-3 等应该也是如此。

我喜欢 Merge 运算符的地方在于它允许配置对内部可观察对象的最大并发订阅。我想使用我尝试实现的自定义 MergeOrdered 运算符来保留此功能。这是我正在构建的方法:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Merge(maximumConcurrency); // How to make it ordered?
}

这是一个用法示例:

var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"{x + 1}-{(char)(65 + y)}")
        .Take(3));

var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

输出(不理想):

Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

理想的输出是:

Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C

澄清:关于值的排序,值本身是无关紧要的。重要的是它们起源的内部序列的顺序,以及它们在该序列中的位置。第一个内部序列中的所有值都应首先发出(按其原始顺序),然​​后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,依此类推

这个 observable 无法知道任何内部 observable 的最后一个值是否是应该产生的第一个值。

举个例子,你可以这样:

Stream of observables: +--1---2---3--|
Observable-1         :    +------------B--------A-|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F-|
Desirable merging    : +------------------------ABCDEF|

在这种情况下,我会这样做:

IObservable<char> query =
    sources
        .ToObservable()
        .Merge()
        .ToArray()
        .SelectMany(xs => xs.OrderBy(x => x));

我找到了解决这个问题的方法,方法是结合使用 Merge, Merge(1)¹ and Replay 运算符。 Merge 运算符执行并发策略,Merge(1) 运算符执行有序的顺序发射。为了防止 Merge 弄乱发出的值的顺序,引入了内部序列的额外包装。每个内部序列都被投影到一个 IObservable<IObservable<T>> ,它立即发出内部序列,并在内部序列完成时完成。这种包装是使用 Observable.Create 方法实现的:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Select(inner => inner.Replay(buffered => Observable
        .Create<IObservable<T>>(observer =>
    {
        observer.OnNext(buffered);
        return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
    })))
    .Merge(maximumConcurrency)
    .Merge(1);
}

Replay 运算符缓冲内部序列发出的所有消息,因此它们不会在 Merge 订阅和 Merge(1).

有趣的是,由于换行,创建了一个中间 IObservable<IObservable<IObservable<T>>> 序列。然后这个可怕的东西被解包了两次,第一次是 Merge,第二次是 Merge(1) 运算符。

这不是一个非常有效的解决方案,因为没有理由缓冲 Merge(1) 当前订阅的内部序列。优化这种低效率并不是微不足道的,所以我将保持原样。在每个子序列包含少量元素的场景中,这个缺陷的影响应该可以忽略不计。在这些情况下,尝试修复它甚至可能弊大于利。

¹ 理想情况下,我想使用 Concat 而不是等效但效率较低的 Merge(1) 运算符。不幸的是,当前版本的 Rx 库 (5.0.0) 中的 Concat 运算符 behaves weirdly。在相当复杂的查询中使用 Concat 时,我什至遇到了死锁行为,通过切换到 Merge(1) 运算符解决了这个问题。


注意: 此答案的原始实现,具有 SemaphoreSlim for controlling the concurrency instead of the Merge operator, can be found in the 1st revision。基于 Merge 的实现应该更好,因为它不涉及即发即弃的任务延续,并且对内部序列的订阅是同步发生的,而不是卸载到 ThreadPool.