合并两个 Observable 时保留排序
Preserve Sorting While Merging Two Observables
我想合并 2 个 observables 并保持顺序(可能基于选择器)。我还想对 observable 的来源施加反压。
因此选择器会选择其中一项通过可观察对象继续推进,而另一项将等待另一项也来进行比较。
Src1、Src2 和 Result 的类型都是 IObservable<T>
。
Src1: { 1,3,6,8,9,10 }
Src2: { 2,4,5,7,11,12 }
Result: 1,2,3,4,5,6,7,8,9,10,11,12
Timeline:
Src1: -1---3----6------8----9-10
Src2: --2-----4---5-7----11---------12
Result: --1--2--3-4-5-6--7-8--9-10-11-12
- 在上面的示例中,src1 发出“1”并被阻塞,直到 src2 发出
这是第一项,“2”。
- 选择最小项目的选择器被应用
从 src1 选择项目。
- Src2 现在等待下一个项目(来自 src1)与其进行比较
当前项目('2')。
- 当 src1 发出下一个项目“3”时,选择又是 运行,这
是时候从 src2 中选择项目了。
- 重复此过程,直到其中一个可观察对象完成。然后,剩余的 observable 会推送项目直到完成。
这是否可以通过现有的 .net Rx 方法实现?
编辑:请注意 2 个源可观察量是 gua运行teed 是有序的。
示例测试:
var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();
var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();
var src1 = source1.ToObservable();
var src2 = source2.ToObservable();
var res = src1.SortedMerge(src2, (a, b) =>
{
if (a <= b)
return a;
else
return b;
});
res.Subscribe((x) => Console.Write($"{x}, "));
期望结果:1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
这很有趣。不得不稍微调整一下算法。可以进一步改进。
假设:
- 有两个流,
streamA
,streamB
普通类型 T
。
- 这两个流分别排序为
streamA[i] < streamA[i+1]
和 streamB[i] < stream[i+1]
。
- 您不能假定
streamA[i]
和 streamB[i]
之间存在任何关系。
- 流 A 和 B 是谨慎的:不会从两者发出相同的元素。如果发生这种情况,我将抛出
NotImplementedException
。 这种情况很容易处理,但我想避免歧义。
- 类型
T
有一个函数 min
。
- 没有关于两个流的相对速度的假设,但如果一个始终比另一个快,背压将是一个问题。
这是我使用的算法:
- 设两个队列,
qA
和 qB
。
- 当您从
streamA
中获取一个项目时,将其排入 qA
。
- 当您从
streamB
中获取一个项目时,将其排队到 qB
中。
- 虽然
qA
和 qB
中都有一个项目,但比较两个队列的顶部项目。移除并发出这两者的最小值。如果两个队列仍然非空,重复。
- 如果
streamA
或 streamB
完成,转储队列的内容并终止。 注意:这确实很懒,应该改成dump,然后继续返回未完成的observable。
代码如下:
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)
{
return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));
}
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)
{
return source
.Select(i => (key: 1, value: i)).Materialize()
.Merge(other.Select(i => (key: 2, value: i)).Materialize())
.Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()),
(state, message) =>
{
if (message.Kind == NotificationKind.OnNext)
{
var key = message.Value.key;
var value = message.Value.value;
var qA = state.qA;
var qB = state.qB;
if (key == 1)
qA = qA.Enqueue(value);
else
qB = qB.Enqueue(value);
var output = new List<T>();
while(!qA.IsEmpty && !qB.IsEmpty)
{
var aVal = qA.Peek();
var bVal = qB.Peek();
var minVal = min(aVal, bVal);
if(aVal.Equals(minVal) && bVal.Equals(minVal))
throw new NotImplementedException();
if(aVal.Equals(minVal))
{
output.Add(aVal);
qA = qA.Dequeue();
}
else
{
output.Add(bVal);
qB = qB.Dequeue();
}
}
return (qA, qB, null, output);
}
else if (message.Kind == NotificationKind.OnError)
{
return (state.qA, state.qB, message.Exception, new List<T>());
}
else //message.Kind == NotificationKind.OnCompleted
{
var output = state.qA.Concat(state.qB).ToList();
return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);
}
})
.Publish(tuples => Observable.Merge(
tuples
.Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
),
tuples
.Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
.Concat(Observable.Return(Notification.CreateOnCompleted<T>()))
),
tuples
.Where(t => t.exception != null)
.Select(t => Notification.CreateOnError<T>(t.exception))
))
.Dematerialize();
ImmutableQueue
来自 System.Collections.Immutable
。 Scan
需要跟踪状态。由于 OnCompleted
处理,需要具体化。诚然,这是一个复杂的解决方案,但我不确定是否有更清晰的以 Rx 为中心的方式。
让我知道您是否需要更多说明。
我想合并 2 个 observables 并保持顺序(可能基于选择器)。我还想对 observable 的来源施加反压。
因此选择器会选择其中一项通过可观察对象继续推进,而另一项将等待另一项也来进行比较。
Src1、Src2 和 Result 的类型都是 IObservable<T>
。
Src1: { 1,3,6,8,9,10 }
Src2: { 2,4,5,7,11,12 }
Result: 1,2,3,4,5,6,7,8,9,10,11,12
Timeline:
Src1: -1---3----6------8----9-10
Src2: --2-----4---5-7----11---------12
Result: --1--2--3-4-5-6--7-8--9-10-11-12
- 在上面的示例中,src1 发出“1”并被阻塞,直到 src2 发出 这是第一项,“2”。
- 选择最小项目的选择器被应用 从 src1 选择项目。
- Src2 现在等待下一个项目(来自 src1)与其进行比较 当前项目('2')。
- 当 src1 发出下一个项目“3”时,选择又是 运行,这 是时候从 src2 中选择项目了。
- 重复此过程,直到其中一个可观察对象完成。然后,剩余的 observable 会推送项目直到完成。
这是否可以通过现有的 .net Rx 方法实现?
编辑:请注意 2 个源可观察量是 gua运行teed 是有序的。
示例测试:
var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();
var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();
var src1 = source1.ToObservable();
var src2 = source2.ToObservable();
var res = src1.SortedMerge(src2, (a, b) =>
{
if (a <= b)
return a;
else
return b;
});
res.Subscribe((x) => Console.Write($"{x}, "));
期望结果:1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
这很有趣。不得不稍微调整一下算法。可以进一步改进。
假设:
- 有两个流,
streamA
,streamB
普通类型T
。 - 这两个流分别排序为
streamA[i] < streamA[i+1]
和streamB[i] < stream[i+1]
。 - 您不能假定
streamA[i]
和streamB[i]
之间存在任何关系。 - 流 A 和 B 是谨慎的:不会从两者发出相同的元素。如果发生这种情况,我将抛出
NotImplementedException
。 这种情况很容易处理,但我想避免歧义。 - 类型
T
有一个函数min
。 - 没有关于两个流的相对速度的假设,但如果一个始终比另一个快,背压将是一个问题。
这是我使用的算法:
- 设两个队列,
qA
和qB
。 - 当您从
streamA
中获取一个项目时,将其排入qA
。 - 当您从
streamB
中获取一个项目时,将其排队到qB
中。 - 虽然
qA
和qB
中都有一个项目,但比较两个队列的顶部项目。移除并发出这两者的最小值。如果两个队列仍然非空,重复。 - 如果
streamA
或streamB
完成,转储队列的内容并终止。 注意:这确实很懒,应该改成dump,然后继续返回未完成的observable。
代码如下:
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)
{
return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));
}
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)
{
return source
.Select(i => (key: 1, value: i)).Materialize()
.Merge(other.Select(i => (key: 2, value: i)).Materialize())
.Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()),
(state, message) =>
{
if (message.Kind == NotificationKind.OnNext)
{
var key = message.Value.key;
var value = message.Value.value;
var qA = state.qA;
var qB = state.qB;
if (key == 1)
qA = qA.Enqueue(value);
else
qB = qB.Enqueue(value);
var output = new List<T>();
while(!qA.IsEmpty && !qB.IsEmpty)
{
var aVal = qA.Peek();
var bVal = qB.Peek();
var minVal = min(aVal, bVal);
if(aVal.Equals(minVal) && bVal.Equals(minVal))
throw new NotImplementedException();
if(aVal.Equals(minVal))
{
output.Add(aVal);
qA = qA.Dequeue();
}
else
{
output.Add(bVal);
qB = qB.Dequeue();
}
}
return (qA, qB, null, output);
}
else if (message.Kind == NotificationKind.OnError)
{
return (state.qA, state.qB, message.Exception, new List<T>());
}
else //message.Kind == NotificationKind.OnCompleted
{
var output = state.qA.Concat(state.qB).ToList();
return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);
}
})
.Publish(tuples => Observable.Merge(
tuples
.Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
),
tuples
.Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
.Concat(Observable.Return(Notification.CreateOnCompleted<T>()))
),
tuples
.Where(t => t.exception != null)
.Select(t => Notification.CreateOnError<T>(t.exception))
))
.Dematerialize();
ImmutableQueue
来自 System.Collections.Immutable
。 Scan
需要跟踪状态。由于 OnCompleted
处理,需要具体化。诚然,这是一个复杂的解决方案,但我不确定是否有更清晰的以 Rx 为中心的方式。
让我知道您是否需要更多说明。