限制 Observable GroupBy / Merge Combination 的并发
Limiting concurrency of Observable GroupBy / Merge Combination
我们正在使用 C# 和 Reactive Extensions 实现一些软件组件。它包含使用 GroupBy
方法拆分 observable 的功能,然后对这些拆分的 observable 执行一些算术运算,然后使用 Merge()
方法将 observable 合并回一起。
如果不使用 maxConcurrent
参数,一切顺利。因为如果使用这个参数,数据好像是'lost'.
尝试搜索此问题。试图合并 Observable.Start
和 Observable.Defer
但没有结果。创建了一个真正的小型测试应用程序来展示问题。
var sourceObservable = Enumerable.Range(0, 10)
.Select(x => new { Index = x, Remainder = x % 3 }).ToObservable();
var ungrouped = sourceObservable.Select(x => x.Index);
var limitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge(maxConcurrent: 2);
var unlimitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge();
Console.WriteLine($"ungrouped: {string.Join(",", await ungrouped.ToList())}");
Console.WriteLine($"limited: {string.Join(",", await limitedGrouping.ToList())}");
Console.WriteLine($"unlimited: {string.Join(",", await unlimitedGrouping.ToList())}");
预计在这种情况下,'limitedGrouping' 内容将与 'unlimitedGrouping' 内容相同。然而它不是:
未分组:0,1,2,3,4,5,6,7,8,9
有限:0,1,3,4,6,7,9
无限制:0,1,2,3,4,5,6,7,8,9
限量版缺少数据编号 2、5 和 8。我们在这里犯了什么错误?
看起来像 GroupBy
中的 intended-but-confusing 功能。此代码是等效的,但同样会失败:
var source = Observable.Range(0, 10);
source
.GroupBy(i => i % 3)
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9
这段代码类似,但是成功了:
var a = source.Where(i => i % 3 == 0);
var b = source.Where(i => i % 3 == 1);
var c = source.Where(i => i % 3 == 2);
var l = new List<IObservable<int>>() { a, b, c };
l.ToObservable()
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9 2 5 8
有点迷幻的是:
source
.GroupBy(i => i % 3)
.Concat() //or .Merge(1), those are roughly equivalent.
.Subscribe(Console.WriteLine); //Outputs 0 3 6 9
当我第一次看到这个时,我预计所有 Merge(2)
个案例都是 0 1 3 4 6 7 9 2 5 8
。我预计 Concat
,基本上 Merge(1)
是 0 3 6 9 1 4 7 2 5 8
。
maxConcurrent(n)
表示一次只能订阅 n
个可观察对象。如果它接收到超过 n
个 observables,那么它会将额外的 observables 排队,并在旧的 observables 结束时稍后订阅。
在我们的例子中,它按顺序接收三个可观察值(mod-0、mod-1 和 mod-2)。它订阅前两个,然后将 mod-2 observable 排队,仅在 mod-0 或 mod-1 完成时订阅。然而,当 mod-0/mod-1 observables 完成时,mod-2 observable 显然也完成了,所以没有收到通知。
刚开始看的时候以为是bug,因为我觉得GroupBy
的child-observables应该是冷的。但看起来他们集体很热情,如果这有任何意义的话:订阅其中一个 children,其他人变得很热。这在 GroupBy
可以用作冷或热可观察对象的运算符的上下文中是有意义的,并且没有内置重放功能。
如果你想看到这个演示,考虑这个:
source
.GroupBy(i => i % 3)
.Select(o => o.Take(3))
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 8
这里的 mod-0 observable 从 6 之后退订,第三个 mod-0 数字。 Merge
然后订阅热 mod-2 observable,输出最后一个 mod-2 数字 8.
希望对您有所帮助。如果您不熟悉 System.Reactive 可观测温度的概念,我推荐 this article。
正如 Shlomo 在 中解释的那样,GroupBy
发出的分组子序列是热的,这意味着无论它们是否被订阅,它们都会开始发出值。因此,如果您不在它们创建后立即订阅它们,您就有可能失去它们的某些价值。实际上,可以保证您至少会丢失一个值,因为每个子序列都是使用已知的第一个值创建的,该值是在所有同步订阅可能发生后立即同步发出的。
另一方面,带有 maxConcurrent
参数的 Merge
运算符通过推迟对某些发出的子序列的订阅来实现并发限制。所以组合 GroupBy
和 Merge
很容易丢失值。以下是您的示例中发生的情况:
Source: +---0---1---2---3---4---5---6---7---8---9---|
Mod-0: +0----------3-----------6-----------9---|
Mod-1: +1----------4-----------7-----------|
Mod-2: +2----------5-----------8-------|
Merge(2): +----0---1------3---4-------6---7-------9---|
Mod-2 子序列在 Mod-0 完成时被订阅,此时它已经发出了所有值。
我能想到的这个问题的唯一解决方案是使用 Replay
运算符使所有分组的子序列可重播。这个运算符 returns 一个 IConnectableObservable<T>
您应该立即连接到它,所以您应该使用 AutoConnect(0)
运算符而不是通常的 RefCount
运算符:
var limitedGrouping = sourceObservable
.GroupBy(x => x.Remainder)
.Select(group => group.Replay().AutoConnect(0).Select(x => x.Index))
.Merge(maxConcurrent: 2);
当然这个解决方案有增加内存使用的缺点。视情况而定,这可能是可以的,也可能是不可接受的。
我们正在使用 C# 和 Reactive Extensions 实现一些软件组件。它包含使用 GroupBy
方法拆分 observable 的功能,然后对这些拆分的 observable 执行一些算术运算,然后使用 Merge()
方法将 observable 合并回一起。
如果不使用 maxConcurrent
参数,一切顺利。因为如果使用这个参数,数据好像是'lost'.
尝试搜索此问题。试图合并 Observable.Start
和 Observable.Defer
但没有结果。创建了一个真正的小型测试应用程序来展示问题。
var sourceObservable = Enumerable.Range(0, 10)
.Select(x => new { Index = x, Remainder = x % 3 }).ToObservable();
var ungrouped = sourceObservable.Select(x => x.Index);
var limitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge(maxConcurrent: 2);
var unlimitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge();
Console.WriteLine($"ungrouped: {string.Join(",", await ungrouped.ToList())}");
Console.WriteLine($"limited: {string.Join(",", await limitedGrouping.ToList())}");
Console.WriteLine($"unlimited: {string.Join(",", await unlimitedGrouping.ToList())}");
预计在这种情况下,'limitedGrouping' 内容将与 'unlimitedGrouping' 内容相同。然而它不是:
未分组:0,1,2,3,4,5,6,7,8,9
有限:0,1,3,4,6,7,9
无限制:0,1,2,3,4,5,6,7,8,9
限量版缺少数据编号 2、5 和 8。我们在这里犯了什么错误?
看起来像 GroupBy
中的 intended-but-confusing 功能。此代码是等效的,但同样会失败:
var source = Observable.Range(0, 10);
source
.GroupBy(i => i % 3)
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9
这段代码类似,但是成功了:
var a = source.Where(i => i % 3 == 0);
var b = source.Where(i => i % 3 == 1);
var c = source.Where(i => i % 3 == 2);
var l = new List<IObservable<int>>() { a, b, c };
l.ToObservable()
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9 2 5 8
有点迷幻的是:
source
.GroupBy(i => i % 3)
.Concat() //or .Merge(1), those are roughly equivalent.
.Subscribe(Console.WriteLine); //Outputs 0 3 6 9
当我第一次看到这个时,我预计所有 Merge(2)
个案例都是 0 1 3 4 6 7 9 2 5 8
。我预计 Concat
,基本上 Merge(1)
是 0 3 6 9 1 4 7 2 5 8
。
maxConcurrent(n)
表示一次只能订阅 n
个可观察对象。如果它接收到超过 n
个 observables,那么它会将额外的 observables 排队,并在旧的 observables 结束时稍后订阅。
在我们的例子中,它按顺序接收三个可观察值(mod-0、mod-1 和 mod-2)。它订阅前两个,然后将 mod-2 observable 排队,仅在 mod-0 或 mod-1 完成时订阅。然而,当 mod-0/mod-1 observables 完成时,mod-2 observable 显然也完成了,所以没有收到通知。
刚开始看的时候以为是bug,因为我觉得GroupBy
的child-observables应该是冷的。但看起来他们集体很热情,如果这有任何意义的话:订阅其中一个 children,其他人变得很热。这在 GroupBy
可以用作冷或热可观察对象的运算符的上下文中是有意义的,并且没有内置重放功能。
如果你想看到这个演示,考虑这个:
source
.GroupBy(i => i % 3)
.Select(o => o.Take(3))
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 8
这里的 mod-0 observable 从 6 之后退订,第三个 mod-0 数字。 Merge
然后订阅热 mod-2 observable,输出最后一个 mod-2 数字 8.
希望对您有所帮助。如果您不熟悉 System.Reactive 可观测温度的概念,我推荐 this article。
正如 Shlomo 在 GroupBy
发出的分组子序列是热的,这意味着无论它们是否被订阅,它们都会开始发出值。因此,如果您不在它们创建后立即订阅它们,您就有可能失去它们的某些价值。实际上,可以保证您至少会丢失一个值,因为每个子序列都是使用已知的第一个值创建的,该值是在所有同步订阅可能发生后立即同步发出的。
另一方面,带有 maxConcurrent
参数的 Merge
运算符通过推迟对某些发出的子序列的订阅来实现并发限制。所以组合 GroupBy
和 Merge
很容易丢失值。以下是您的示例中发生的情况:
Source: +---0---1---2---3---4---5---6---7---8---9---|
Mod-0: +0----------3-----------6-----------9---|
Mod-1: +1----------4-----------7-----------|
Mod-2: +2----------5-----------8-------|
Merge(2): +----0---1------3---4-------6---7-------9---|
Mod-2 子序列在 Mod-0 完成时被订阅,此时它已经发出了所有值。
我能想到的这个问题的唯一解决方案是使用 Replay
运算符使所有分组的子序列可重播。这个运算符 returns 一个 IConnectableObservable<T>
您应该立即连接到它,所以您应该使用 AutoConnect(0)
运算符而不是通常的 RefCount
运算符:
var limitedGrouping = sourceObservable
.GroupBy(x => x.Remainder)
.Select(group => group.Replay().AutoConnect(0).Select(x => x.Index))
.Merge(maxConcurrent: 2);
当然这个解决方案有增加内存使用的缺点。视情况而定,这可能是可以的,也可能是不可接受的。