将两个 Observable 与一个优先级更高的 Observable 合并
Merging two Observables with one taking higher priority
是否可以使用 ReactiveExtensions 实现以下功能;
两个 Observable,一个是 "High" 优先级,另一个是 "Low"
将两个 Observable 合并为一个,然后可以对其进行订阅,目的是使生成的 Observable 始终先于任何低优先级项发出高优先级项。
我知道这可以使用两个 ConcurrentQueue
集合和类似这样的东西更简单地实现;
return this.highPriorityItems.TryDequeue(out item)
|| this.lowPriorityItems.TryDequeue(out item);
但是这种方法有一些问题,比如不像 Observable 那样 "subscribable"(所以一旦队列耗尽,处理就会结束,没有很多额外的废话来把它推到一个任务).
此外,我有兴趣在队列上应用一些额外的过滤,例如节流和 "distinct until changed",因此 Rx 似乎很适合这里。
像这样的问题你需要考虑时间。在上面的评论中,您谈到了用户通知。在我看来,你想要的是这样的表达方式:显示最近的通知,除非有高优先级通知,在那种情况下显示。
气泡图可以更容易地对此进行推理。一个字符为一秒:
High : ---------3---5-6
Low : 1--2-------4----
Result: 1--2-----3---5-6
这是你的想法吗?您想缓冲消息并稍后显示它们吗?就像在这种情况下,消息 5 只能显示 2 秒可以吗?
你说的当然是优先队列
Rx 是关于 流 事件,而不是队列。当然,队列 在 Rx 中被大量使用 - 但它们不是第一个 class 概念,更多是 Rx 概念的实现细节的一部分。
我们需要队列的一个很好的例子是处理一个缓慢的观察者。事件在 Rx 中按顺序调度,如果事件到达的速度快于观察者可以处理的速度,那么它们必须针对该观察者排队。如果有很多观察者,那么必须维护多个逻辑队列,因为观察者可能以不同的速度前进 - 并且 Rx 选择不让它们保持同步。
"Back-pressure" 是观察者向可观察对象提供反馈的概念,以便允许机制处理更快可观察对象的压力——例如合并或节流。 Rx 没有 first-class 引入背压的方式——唯一的内置方式是通过 OnNext
的同步特性来监视观察者。任何其他机制都需要带外。您的问题与背压直接相关,因为它仅在观察者速度慢的情况下才相关。
我提到所有这些是为了证明我的说法,即 Rx 不是提供您正在寻找的那种优先级调度的好选择 - 实际上,第一个 class 排队机制似乎更好适合。
要解决手头的问题,您需要在自定义运算符中自行管理优先级排队。重申一下这个问题:你的意思是,如果事件在观察者处理 OnNext
事件期间到达,那么就会有大量事件要分派,而不是 Rx 使用的典型 FIFO 队列,您想根据某些优先级进行调度。
需要注意的是,本着 Rx 如何不让多个观察者保持同步的精神,并发观察者可能会以不同的顺序看到事件,这对您来说可能是也可能不是问题。您可以使用 Publish
之类的机制来获得顺序一致性 - 但您可能不想这样做,因为在这种情况下事件传递的时间会变得非常不可预测且效率低下。
我相信有更好的方法可以做到这一点,但这里有一个基于优先级队列的交付示例 - 您可以将其扩展为适用于多个流和优先级(甚至每个事件的优先级),使用更好的队列实现(例如基于 b 树的优先级队列),但我选择保持它相当简单。即便如此,请注意代码必须解决的大量问题,围绕错误处理、完成等 - 我已经选择了何时发出这些信号,肯定还有很多其他有效的选择。
总而言之,这个实现肯定会让 我 放弃为此使用 Rx 的想法。它足够复杂,无论如何这里都可能存在错误。正如我所说,可能会有更简洁的代码(特别是考虑到我已经付出了最少的努力!),但是概念上,无论实现如何,我都对这个想法感到不舒服:
public static class ObservableExtensions
{
public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
this IObservable<TSource> source,
IObservable<TSource> lowPriority,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(o => {
// BufferBlock from TPL dataflow is used as it is
// handily awaitable. package: Microsoft.Tpl.Dataflow
var loQueue = new BufferBlock<TSource>();
var hiQueue = new BufferBlock<TSource>();
var errorQueue = new BufferBlock<Exception>();
var done = new TaskCompletionSource<int>();
int doneCount = 0;
Action incDone = () => {
var dc = Interlocked.Increment(ref doneCount);
if(dc == 2)
done.SetResult(0);
};
source.Subscribe(
x => hiQueue.Post(x),
e => errorQueue.Post(e),
incDone);
lowPriority.Subscribe(
x => loQueue.Post(x),
e => errorQueue.Post(e),
incDone);
return scheduler.ScheduleAsync(async(ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
TSource nextItem;
if(hiQueue.TryReceive(out nextItem)
|| loQueue.TryReceive(out nextItem))
o.OnNext(nextItem);
else if(done.Task.IsCompleted)
{
o.OnCompleted();
return;
}
Exception error;
if(errorQueue.TryReceive(out error))
{
o.OnError(error);
return;
}
var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);
var loAvailableAsync = loQueue.OutputAvailableAsync(ct);
var errAvailableAsync =
errorQueue.OutputAvailableAsync(ct);
await Task.WhenAny(
hiAvailableAsync,
loAvailableAsync,
errAvailableAsync,
done.Task);
}
});
});
}
}
以及用法示例:
void static Main()
{
var xs = Observable.Range(0, 3);
var ys = Observable.Range(10, 3);
var source = ys.MergeWithLowPriorityStream(xs);
source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}
这将首先打印出ys
的元素,表明它们的优先级更高。
是否可以使用 ReactiveExtensions 实现以下功能;
两个 Observable,一个是 "High" 优先级,另一个是 "Low"
将两个 Observable 合并为一个,然后可以对其进行订阅,目的是使生成的 Observable 始终先于任何低优先级项发出高优先级项。
我知道这可以使用两个 ConcurrentQueue
集合和类似这样的东西更简单地实现;
return this.highPriorityItems.TryDequeue(out item)
|| this.lowPriorityItems.TryDequeue(out item);
但是这种方法有一些问题,比如不像 Observable 那样 "subscribable"(所以一旦队列耗尽,处理就会结束,没有很多额外的废话来把它推到一个任务).
此外,我有兴趣在队列上应用一些额外的过滤,例如节流和 "distinct until changed",因此 Rx 似乎很适合这里。
像这样的问题你需要考虑时间。在上面的评论中,您谈到了用户通知。在我看来,你想要的是这样的表达方式:显示最近的通知,除非有高优先级通知,在那种情况下显示。
气泡图可以更容易地对此进行推理。一个字符为一秒:
High : ---------3---5-6
Low : 1--2-------4----
Result: 1--2-----3---5-6
这是你的想法吗?您想缓冲消息并稍后显示它们吗?就像在这种情况下,消息 5 只能显示 2 秒可以吗?
你说的当然是优先队列
Rx 是关于 流 事件,而不是队列。当然,队列 在 Rx 中被大量使用 - 但它们不是第一个 class 概念,更多是 Rx 概念的实现细节的一部分。
我们需要队列的一个很好的例子是处理一个缓慢的观察者。事件在 Rx 中按顺序调度,如果事件到达的速度快于观察者可以处理的速度,那么它们必须针对该观察者排队。如果有很多观察者,那么必须维护多个逻辑队列,因为观察者可能以不同的速度前进 - 并且 Rx 选择不让它们保持同步。
"Back-pressure" 是观察者向可观察对象提供反馈的概念,以便允许机制处理更快可观察对象的压力——例如合并或节流。 Rx 没有 first-class 引入背压的方式——唯一的内置方式是通过 OnNext
的同步特性来监视观察者。任何其他机制都需要带外。您的问题与背压直接相关,因为它仅在观察者速度慢的情况下才相关。
我提到所有这些是为了证明我的说法,即 Rx 不是提供您正在寻找的那种优先级调度的好选择 - 实际上,第一个 class 排队机制似乎更好适合。
要解决手头的问题,您需要在自定义运算符中自行管理优先级排队。重申一下这个问题:你的意思是,如果事件在观察者处理 OnNext
事件期间到达,那么就会有大量事件要分派,而不是 Rx 使用的典型 FIFO 队列,您想根据某些优先级进行调度。
需要注意的是,本着 Rx 如何不让多个观察者保持同步的精神,并发观察者可能会以不同的顺序看到事件,这对您来说可能是也可能不是问题。您可以使用 Publish
之类的机制来获得顺序一致性 - 但您可能不想这样做,因为在这种情况下事件传递的时间会变得非常不可预测且效率低下。
我相信有更好的方法可以做到这一点,但这里有一个基于优先级队列的交付示例 - 您可以将其扩展为适用于多个流和优先级(甚至每个事件的优先级),使用更好的队列实现(例如基于 b 树的优先级队列),但我选择保持它相当简单。即便如此,请注意代码必须解决的大量问题,围绕错误处理、完成等 - 我已经选择了何时发出这些信号,肯定还有很多其他有效的选择。
总而言之,这个实现肯定会让 我 放弃为此使用 Rx 的想法。它足够复杂,无论如何这里都可能存在错误。正如我所说,可能会有更简洁的代码(特别是考虑到我已经付出了最少的努力!),但是概念上,无论实现如何,我都对这个想法感到不舒服:
public static class ObservableExtensions
{
public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
this IObservable<TSource> source,
IObservable<TSource> lowPriority,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(o => {
// BufferBlock from TPL dataflow is used as it is
// handily awaitable. package: Microsoft.Tpl.Dataflow
var loQueue = new BufferBlock<TSource>();
var hiQueue = new BufferBlock<TSource>();
var errorQueue = new BufferBlock<Exception>();
var done = new TaskCompletionSource<int>();
int doneCount = 0;
Action incDone = () => {
var dc = Interlocked.Increment(ref doneCount);
if(dc == 2)
done.SetResult(0);
};
source.Subscribe(
x => hiQueue.Post(x),
e => errorQueue.Post(e),
incDone);
lowPriority.Subscribe(
x => loQueue.Post(x),
e => errorQueue.Post(e),
incDone);
return scheduler.ScheduleAsync(async(ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
TSource nextItem;
if(hiQueue.TryReceive(out nextItem)
|| loQueue.TryReceive(out nextItem))
o.OnNext(nextItem);
else if(done.Task.IsCompleted)
{
o.OnCompleted();
return;
}
Exception error;
if(errorQueue.TryReceive(out error))
{
o.OnError(error);
return;
}
var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);
var loAvailableAsync = loQueue.OutputAvailableAsync(ct);
var errAvailableAsync =
errorQueue.OutputAvailableAsync(ct);
await Task.WhenAny(
hiAvailableAsync,
loAvailableAsync,
errAvailableAsync,
done.Task);
}
});
});
}
}
以及用法示例:
void static Main()
{
var xs = Observable.Range(0, 3);
var ys = Observable.Range(10, 3);
var source = ys.MergeWithLowPriorityStream(xs);
source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}
这将首先打印出ys
的元素,表明它们的优先级更高。