限制 Observable GroupBy / Merge Combination 的并发

Limiting concurrency of Observable GroupBy / Merge Combination

我们正在使用 C# 和 Reactive Extensions 实现一些软件组件。它包含使用 GroupBy 方法拆分 observable 的功能,然后对这些拆分的 observable 执行一些算术运算,然后使用 Merge() 方法将 observable 合并回一起。

如果不使用 maxConcurrent 参数,一切顺利。因为如果使用这个参数,数据好像是'lost'.

尝试搜索此问题。试图合并 Observable.StartObservable.Defer 但没有结果。创建了一个真正的小型测试应用程序来展示问题。

var sourceObservable = Enumerable.Range(0, 10)
    .Select(x => new { Index = x, Remainder = x % 3 }).ToObservable();

var ungrouped = sourceObservable.Select(x => x.Index);
var limitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
    .Select(group => group.Select(x => x.Index)).Merge(maxConcurrent: 2);
var unlimitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
    .Select(group => group.Select(x => x.Index)).Merge();

Console.WriteLine($"ungrouped: {string.Join(",", await ungrouped.ToList())}");
Console.WriteLine($"limited: {string.Join(",", await limitedGrouping.ToList())}");
Console.WriteLine($"unlimited: {string.Join(",", await unlimitedGrouping.ToList())}");

预计在这种情况下,'limitedGrouping' 内容将与 'unlimitedGrouping' 内容相同。然而它不是:

未分组:0,1,2,3,4,5,6,7,8,9

有限:0,1,3,4,6,7,9

无限制:0,1,2,3,4,5,6,7,8,9

限量版缺少数据编号 2、5 和 8。我们在这里犯了什么错误?

看起来像 GroupBy 中的 intended-but-confusing 功能。此代码是等效的,但同样会失败:

var source = Observable.Range(0, 10);
source
    .GroupBy(i => i % 3)
    .Merge(2)
    .Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9

这段代码类似,但是成功了:

var a = source.Where(i => i % 3 == 0);
var b = source.Where(i => i % 3 == 1);
var c = source.Where(i => i % 3 == 2);
var l = new List<IObservable<int>>() { a, b, c };
l.ToObservable()
    .Merge(2)
    .Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9 2 5 8

有点迷幻的是:

source
    .GroupBy(i => i % 3)
    .Concat() //or .Merge(1), those are roughly equivalent.
    .Subscribe(Console.WriteLine); //Outputs 0 3 6 9

当我第一次看到这个时,我预计所有 Merge(2) 个案例都是 0 1 3 4 6 7 9 2 5 8。我预计 Concat,基本上 Merge(1)0 3 6 9 1 4 7 2 5 8

maxConcurrent(n) 表示一次只能订阅 n 个可观察对象。如果它接收到超过 n 个 observables,那么它会将额外的 observables 排队,并在旧的 observables 结束时稍后订阅。

在我们的例子中,它按顺序接收三个可观察值(mod-0、mod-1 和 mod-2)。它订阅前两个,然后将 mod-2 observable 排队,仅在 mod-0 或 mod-1 完成时订阅。然而,当 mod-0/mod-1 observables 完成时,mod-2 observable 显然也完成了,所以没有收到通知。

刚开始看的时候以为是bug,因为我觉得GroupBy的child-observables应该是冷的。但看起来他们集体很热情,如果这有任何意义的话:订阅其中一个 children,其他人变得很热。这在 GroupBy 可以用作冷或热可观察对象的运算符的上下文中是有意义的,并且没有内置重放功能。

如果你想看到这个演示,考虑这个:

source
    .GroupBy(i => i % 3)
    .Select(o => o.Take(3))
    .Merge(2)
    .Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 8

这里的 mod-0 observable 从 6 之后退订,第三个 mod-0 数字。 Merge 然后订阅热 mod-2 observable,输出最后一个 mod-2 数字 8.

希望对您有所帮助。如果您不熟悉 System.Reactive 可观测温度的概念,我推荐 this article

正如 Shlomo 在 中解释的那样,GroupBy 发出的分组子序列是热的,这意味着无论它们是否被订阅,它们都会开始发出值。因此,如果您不在它们创建后立即订阅它们,您就有可能失去它们的某些价值。实际上,可以保证您至少会丢失一个值,因为每个子序列都是使用已知的第一个值创建的,该值是在所有同步订阅可能发生后立即同步发出的。

另一方面,带有 maxConcurrent 参数的 Merge 运算符通过推迟对某些发出的子序列的订阅来实现并发限制。所以组合 GroupByMerge 很容易丢失值。以下是您的示例中发生的情况:

Source:   +---0---1---2---3---4---5---6---7---8---9---|
Mod-0:        +0----------3-----------6-----------9---|
Mod-1:            +1----------4-----------7-----------|
Mod-2:                +2----------5-----------8-------|
Merge(2): +----0---1------3---4-------6---7-------9---|

Mod-2 子序列在 Mod-0 完成时被订阅,此时它已经发出了所有值。

我能想到的这个问题的唯一解决方案是使用 Replay 运算符使所有分组的子序列可重播。这个运算符 returns 一个 IConnectableObservable<T> 您应该立即连接到它,所以您应该使用 AutoConnect(0) 运算符而不是通常的 RefCount 运算符:

var limitedGrouping = sourceObservable
    .GroupBy(x => x.Remainder)
    .Select(group => group.Replay().AutoConnect(0).Select(x => x.Index))
    .Merge(maxConcurrent: 2);

当然这个解决方案有增加内存使用的缺点。视情况而定,这可能是可以的,也可能是不可接受的。