Reactive Extensions 能否满足顺序不变性、同步和多线程?

Can Reactive Extensions satisfy order invariance, synchronization and multithreading?

我希望能够在多个核心上处理事件流,但要保持所有内容同步,以便所有订阅者以锁定步骤处理事件,因此没有任何一个订阅者会领先于任何其他订阅者。

换句话说,我希望快速订阅者等到所有其他慢速订阅者完成每个事件,然后再进行下一个事件。每个订阅者都有一个过滤器,因此它只处理它感兴趣的事件。

如果这可行,我可以轻松利用系统中的所有内核,而不会 运行 陷入太多的多线程或同步问题。

例子

假设我们有一个在单个线程上生成的 RX 事件流。我们有两个 RX 订阅者,AB。我们有这些限制:

到目前为止我有什么

我试过很多Synchronize的组合,结合下面的代码:

var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
rx.ObserveOn(ThreadPoolScheduler.Instance)
    .Subscribe(o =>
{
    // Fast Subscriber A. Takes 20 milliseconds.
    Thread.Sleep(TimeSpan.FromMilliseconds(20));
    Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});

rx.ObserveOn(ThreadPoolScheduler.Instance)              
    .Subscribe(o =>
{
    // Slow Subscriber B. Takes 500 milliseconds.
    Thread.Sleep(TimeSpan.FromMilliseconds(500));
    Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});

for (int j = 0; j < 5; j++)
{

    int j1 = j;
    rx.OnNext(j1);
    Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}

程序的当前输出

Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 288 milliseconds.
Subscriber A: j=1 (thread 10). Time: 308 milliseconds.
Subscriber A: j=2 (thread 10). Time: 328 milliseconds.
Subscriber A: j=3 (thread 10). Time: 348 milliseconds.
Subscriber A: j=4 (thread 10). Time: 368 milliseconds.
Subscriber B: j=0 (thread 11). Time: 768 milliseconds.
Subscriber B: j=1 (thread 11). Time: 1268 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1768 milliseconds.
Subscriber B: j=3 (thread 11). Time: 2268 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2768 milliseconds.

程序的期望输出

Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 000 milliseconds.
Subscriber B: j=0 (thread 11). Time: 000 milliseconds.
Subscriber A: j=1 (thread 10). Time: 500 milliseconds.
Subscriber B: j=1 (thread 11). Time: 500 milliseconds.
Subscriber A: j=2 (thread 10). Time: 1000 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1000 milliseconds.
Subscriber A: j=3 (thread 10). Time: 1500 milliseconds.
Subscriber B: j=3 (thread 11). Time: 1500 milliseconds.
Subscriber A: j=4 (thread 10). Time: 2000 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2000 milliseconds.

本质上,我希望所有订阅者并行处理事件 j=0,然后所有订阅者并行处理事件 j=1,等等,即使一些订阅者比其他订阅者慢。在这种情况下,订阅者 A 很快(20 毫秒)而订阅者 B 很慢(500 毫秒),因此我们需要某种锁或门,以便订阅者 A 等待订阅者 B 完成后再继续下一个事件,或者副如果订阅者 B 比订阅者 A 快,则相反。

当然,这是在单线程模式下自然会发生的情况,但是这样就失去了由多个订阅者并行处理同一事件的能力,这意味着我无法轻松利用我的系统。

更新

感谢@Jonas Chapuis 使用 Sort() 的回答。

但是,在这种特殊情况下,我的目标是阻止快速订阅者在消费事件时领先于慢速订阅者,即我需要某种锁或门,以便快速订阅者等待所有慢速订阅者已经完成了事件,然后才进入下一个事件。

换句话说,我希望所有订阅者在事件中同步移动,没有单个订阅者领先于其他订阅者。 RX 事件将在单个线程上创建,因此它们永远不会乱序。

更新

几个月后,事实证明我使用了错误的体系结构,这是一个错误的问题。

我应该观察 EventLoopScheduler,而不是观察 ThreadPoolScheduler.Instance,它将所有订阅锁定到单个线程。这样可以保留顺序。

为了获得时间序列数据的并行性,最好将数据处理分为多个阶段的管道,每个线程集中在一个管道阶段。这样处理起来就简单多了,而且满足上面所有的约束。

据我所知,您正试图在保持元素顺序的同时最大化处理吞吐量。这是 James World 在 Reordering events with Reactive Extensions.

中描述的基于自定义 Sort() 运算符的解决方案

并行处理元素意味着失去顺序。为了恢复原始顺序,我们使用 Sort() 运算符(在幕后,该运算符主要根据传递的密钥生成器函数执行缓冲和释放值)。

  var random = new Random();
  var xs = Observable.Range(0, 10);
  xs.SelectMany((index, value) => Observable.Start(() =>
  {
      Thread.Sleep(TimeSpan.FromMilliseconds(random.Next(0, 1000)));
      Console.WriteLine("Thread {0}: processing value {1}.",
          Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(2), value);
      return new {Index = index, Value = value};
  }, ThreadPoolScheduler.Instance))
      .Sort(el => el.Index, 0, i => i + 1)
      .Subscribe(el => Console.WriteLine(el.Value));

这将产生如下输出:

Thread 15: processing value 7.
Thread 10: processing value 0.
0
Thread 16: processing value 5.
Thread 11: processing value 1.
1
Thread 15: processing value 8.
Thread 14: processing value 6.
Thread 13: processing value 2.
2
Thread 10: processing value 9.
Thread 17: processing value 4.
Thread 12: processing value 3.
3
4
5
6
7
8
9

好的,我确实没有完全理解您的要求,对此感到抱歉。下面你会发现一种不同的方法,它依赖于订阅者通过专门的主题发出他们已经完成的信号。然后将这些主题压缩在一起:这为您提供了 "lock" 语义(请注意 Zip 运算符的重载最多支持 16 个源)。

 var sw = Stopwatch.StartNew();
 var rx = new Subject<int>();
 var subscriberADone = new Subject<Unit>();
 var subscriberBDone = new Subject<Unit>();
 var bothSubscribersDone = subscriberADone.Zip(subscriberBDone, (_, __) => Unit.Default);
 var lockStepInput = rx.Zip(bothSubscribersDone.StartWith(Unit.Default), (i, _) => i);
 lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
     .Subscribe(o =>
     {
         // Fast Subscriber A. Takes 20 milliseconds.
         Thread.Sleep(TimeSpan.FromMilliseconds(20));
         Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
         subscriberADone.OnNext(Unit.Default);
     });

 lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
     .Subscribe(o =>
     {
         // Slow Subscriber B. Takes 500 milliseconds.
         Thread.Sleep(TimeSpan.FromMilliseconds(500));
         Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
         subscriberBDone.OnNext(Unit.Default);
     });

 for (int j = 0; j < 5; j++)
 {

     int j1 = j;
     rx.OnNext(j1);
     Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
 }

这生成了以下输出:

Push: 0 (thread 9)
Push: 1 (thread 9)
Push: 2 (thread 9)
Push: 3 (thread 9)
Push: 4 (thread 9)
Subscriber A: 0 (thread 10). Time: 111 milliseconds.
Subscriber B: 0 (thread 11). Time: 591 milliseconds.
Subscriber A: 1 (thread 10). Time: 611 milliseconds.
Subscriber B: 1 (thread 11). Time: 1091 milliseconds.
Subscriber A: 2 (thread 10). Time: 1111 milliseconds.
Subscriber B: 2 (thread 11). Time: 1591 milliseconds.
Subscriber A: 3 (thread 10). Time: 1611 milliseconds.
Subscriber B: 3 (thread 11). Time: 2091 milliseconds.
Subscriber A: 4 (thread 10). Time: 2111 milliseconds.
Subscriber B: 4 (thread 11). Time: 2591 milliseconds.