如何合并多个具有顺序保留和最大并发性的可观察对象?
How to merge multiple observables with order preservation and maximum concurrency?
我搜索了一个副本,但没有找到。我有一个嵌套的可观察对象 IObservable<IObservable<T>>
,我想将它展平为 IObservable<T>
。我不想使用 Concat
operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T
values immediately after they are emitted by the outer observable. I also don't want to use the Merge
运算符,因为它会打乱发出值的顺序。下面的大理石图显示了 Merge
运算符的有问题的(对于我的情况)行为,以及理想的合并行为。
Stream of observables: +----1------2-----3----|
Observable-1 : +--A-----------------B-------|
Observable-2 : +---C---------------------D------|
Observable-3 : +--E--------------------F-------|
Merge (undesirable) : +-------A-------C----E----B-----------D---F-------|
Desirable merging : +-------A-----------------B-------C---D------EF---|
Observable-1 发出的所有值都应先于 Observable-2 发出的任何值。 Observable-2 和 Observable-3 等应该也是如此。
我喜欢 Merge
运算符的地方在于它允许配置对内部可观察对象的最大并发订阅。我想使用我尝试实现的自定义 MergeOrdered
运算符来保留此功能。这是我正在构建的方法:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Merge(maximumConcurrency); // How to make it ordered?
}
这是一个用法示例:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"{x + 1}-{(char)(65 + y)}")
.Take(3));
var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");
输出(不理想):
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C
理想的输出是:
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C
澄清:关于值的排序,值本身是无关紧要的。重要的是它们起源的内部序列的顺序,以及它们在该序列中的位置。第一个内部序列中的所有值都应首先发出(按其原始顺序),然后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,依此类推
这个 observable 无法知道任何内部 observable 的最后一个值是否是应该产生的第一个值。
举个例子,你可以这样:
Stream of observables: +--1---2---3--|
Observable-1 : +------------B--------A-|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F-|
Desirable merging : +------------------------ABCDEF|
在这种情况下,我会这样做:
IObservable<char> query =
sources
.ToObservable()
.Merge()
.ToArray()
.SelectMany(xs => xs.OrderBy(x => x));
我找到了解决这个问题的方法,方法是结合使用 Merge
, Merge(1)
¹ and Replay
运算符。 Merge
运算符执行并发策略,Merge(1)
运算符执行有序的顺序发射。为了防止 Merge
弄乱发出的值的顺序,引入了内部序列的额外包装。每个内部序列都被投影到一个 IObservable<IObservable<T>>
,它立即发出内部序列,并在内部序列完成时完成。这种包装是使用 Observable.Create
方法实现的:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Select(inner => inner.Replay(buffered => Observable
.Create<IObservable<T>>(observer =>
{
observer.OnNext(buffered);
return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
})))
.Merge(maximumConcurrency)
.Merge(1);
}
Replay
运算符缓冲内部序列发出的所有消息,因此它们不会在 Merge
订阅和 Merge(1)
.
有趣的是,由于换行,创建了一个中间 IObservable<IObservable<IObservable<T>>>
序列。然后这个可怕的东西被解包了两次,第一次是 Merge
,第二次是 Merge(1)
运算符。
这不是一个非常有效的解决方案,因为没有理由缓冲 Merge(1)
当前订阅的内部序列。优化这种低效率并不是微不足道的,所以我将保持原样。在每个子序列包含少量元素的场景中,这个缺陷的影响应该可以忽略不计。在这些情况下,尝试修复它甚至可能弊大于利。
¹ 理想情况下,我想使用 Concat
而不是等效但效率较低的 Merge(1)
运算符。不幸的是,当前版本的 Rx 库 (5.0.0) 中的 Concat
运算符 behaves weirdly。在相当复杂的查询中使用 Concat
时,我什至遇到了死锁行为,通过切换到 Merge(1)
运算符解决了这个问题。
注意: 此答案的原始实现,具有 SemaphoreSlim
for controlling the concurrency instead of the Merge
operator, can be found in the 1st revision。基于 Merge
的实现应该更好,因为它不涉及即发即弃的任务延续,并且对内部序列的订阅是同步发生的,而不是卸载到 ThreadPool
.
我搜索了一个副本,但没有找到。我有一个嵌套的可观察对象 IObservable<IObservable<T>>
,我想将它展平为 IObservable<T>
。我不想使用 Concat
operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T
values immediately after they are emitted by the outer observable. I also don't want to use the Merge
运算符,因为它会打乱发出值的顺序。下面的大理石图显示了 Merge
运算符的有问题的(对于我的情况)行为,以及理想的合并行为。
Stream of observables: +----1------2-----3----|
Observable-1 : +--A-----------------B-------|
Observable-2 : +---C---------------------D------|
Observable-3 : +--E--------------------F-------|
Merge (undesirable) : +-------A-------C----E----B-----------D---F-------|
Desirable merging : +-------A-----------------B-------C---D------EF---|
Observable-1 发出的所有值都应先于 Observable-2 发出的任何值。 Observable-2 和 Observable-3 等应该也是如此。
我喜欢 Merge
运算符的地方在于它允许配置对内部可观察对象的最大并发订阅。我想使用我尝试实现的自定义 MergeOrdered
运算符来保留此功能。这是我正在构建的方法:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Merge(maximumConcurrency); // How to make it ordered?
}
这是一个用法示例:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"{x + 1}-{(char)(65 + y)}")
.Take(3));
var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");
输出(不理想):
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C
理想的输出是:
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C
澄清:关于值的排序,值本身是无关紧要的。重要的是它们起源的内部序列的顺序,以及它们在该序列中的位置。第一个内部序列中的所有值都应首先发出(按其原始顺序),然后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,依此类推
这个 observable 无法知道任何内部 observable 的最后一个值是否是应该产生的第一个值。
举个例子,你可以这样:
Stream of observables: +--1---2---3--|
Observable-1 : +------------B--------A-|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F-|
Desirable merging : +------------------------ABCDEF|
在这种情况下,我会这样做:
IObservable<char> query =
sources
.ToObservable()
.Merge()
.ToArray()
.SelectMany(xs => xs.OrderBy(x => x));
我找到了解决这个问题的方法,方法是结合使用 Merge
, Merge(1)
¹ and Replay
运算符。 Merge
运算符执行并发策略,Merge(1)
运算符执行有序的顺序发射。为了防止 Merge
弄乱发出的值的顺序,引入了内部序列的额外包装。每个内部序列都被投影到一个 IObservable<IObservable<T>>
,它立即发出内部序列,并在内部序列完成时完成。这种包装是使用 Observable.Create
方法实现的:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Select(inner => inner.Replay(buffered => Observable
.Create<IObservable<T>>(observer =>
{
observer.OnNext(buffered);
return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
})))
.Merge(maximumConcurrency)
.Merge(1);
}
Replay
运算符缓冲内部序列发出的所有消息,因此它们不会在 Merge
订阅和 Merge(1)
.
有趣的是,由于换行,创建了一个中间 IObservable<IObservable<IObservable<T>>>
序列。然后这个可怕的东西被解包了两次,第一次是 Merge
,第二次是 Merge(1)
运算符。
这不是一个非常有效的解决方案,因为没有理由缓冲 Merge(1)
当前订阅的内部序列。优化这种低效率并不是微不足道的,所以我将保持原样。在每个子序列包含少量元素的场景中,这个缺陷的影响应该可以忽略不计。在这些情况下,尝试修复它甚至可能弊大于利。
¹ 理想情况下,我想使用 Concat
而不是等效但效率较低的 Merge(1)
运算符。不幸的是,当前版本的 Rx 库 (5.0.0) 中的 Concat
运算符 behaves weirdly。在相当复杂的查询中使用 Concat
时,我什至遇到了死锁行为,通过切换到 Merge(1)
运算符解决了这个问题。
注意: 此答案的原始实现,具有 SemaphoreSlim
for controlling the concurrency instead of the Merge
operator, can be found in the 1st revision。基于 Merge
的实现应该更好,因为它不涉及即发即弃的任务延续,并且对内部序列的订阅是同步发生的,而不是卸载到 ThreadPool
.