Reactive Extensions 能否满足顺序不变性、同步和多线程?
Can Reactive Extensions satisfy order invariance, synchronization and multithreading?
我希望能够在多个核心上处理事件流,但要保持所有内容同步,以便所有订阅者以锁定步骤处理事件,因此没有任何一个订阅者会领先于任何其他订阅者。
换句话说,我希望快速订阅者等到所有其他慢速订阅者完成每个事件,然后再进行下一个事件。每个订阅者都有一个过滤器,因此它只处理它感兴趣的事件。
如果这可行,我可以轻松利用系统中的所有内核,而不会 运行 陷入太多的多线程或同步问题。
例子
假设我们有一个在单个线程上生成的 RX 事件流。我们有两个 RX 订阅者,A
和 B
。我们有这些限制:
- 每个 RX 事件必须由所有订阅者以锁定步骤处理,即事件
j=2
不会被订阅者 B
处理,直到事件 j=1
已被所有订阅者 A
和 B
完全处理,事件 j=3
将不会被订阅者 B 处理,直到事件 j=2
已被所有订阅者完全处理 A
和B
等
- 并行处理每个 RX 事件,即订阅者
A
可以与订阅者 B
并行处理事件 j=1
处理事件 j=1
,等等
- 顺序不变性,即所有订阅者都按照事件创建的顺序接收事件,因此事件
j=0
将始终进行 j=1
,事件 j=1
将始终进行 j=2
,等等。如果在单个线程上推送事件,则会自动发生这种情况,因此已经满足此约束条件。
到目前为止我有什么
我试过很多Synchronize
的组合,结合下面的代码:
var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
rx.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Fast Subscriber A. Takes 20 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(20));
Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});
rx.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Slow Subscriber B. Takes 500 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});
for (int j = 0; j < 5; j++)
{
int j1 = j;
rx.OnNext(j1);
Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}
程序的当前输出
Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 288 milliseconds.
Subscriber A: j=1 (thread 10). Time: 308 milliseconds.
Subscriber A: j=2 (thread 10). Time: 328 milliseconds.
Subscriber A: j=3 (thread 10). Time: 348 milliseconds.
Subscriber A: j=4 (thread 10). Time: 368 milliseconds.
Subscriber B: j=0 (thread 11). Time: 768 milliseconds.
Subscriber B: j=1 (thread 11). Time: 1268 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1768 milliseconds.
Subscriber B: j=3 (thread 11). Time: 2268 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2768 milliseconds.
程序的期望输出
Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 000 milliseconds.
Subscriber B: j=0 (thread 11). Time: 000 milliseconds.
Subscriber A: j=1 (thread 10). Time: 500 milliseconds.
Subscriber B: j=1 (thread 11). Time: 500 milliseconds.
Subscriber A: j=2 (thread 10). Time: 1000 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1000 milliseconds.
Subscriber A: j=3 (thread 10). Time: 1500 milliseconds.
Subscriber B: j=3 (thread 11). Time: 1500 milliseconds.
Subscriber A: j=4 (thread 10). Time: 2000 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2000 milliseconds.
本质上,我希望所有订阅者并行处理事件 j=0
,然后所有订阅者并行处理事件 j=1
,等等,即使一些订阅者比其他订阅者慢。在这种情况下,订阅者 A 很快(20 毫秒)而订阅者 B 很慢(500 毫秒),因此我们需要某种锁或门,以便订阅者 A 等待订阅者 B 完成后再继续下一个事件,或者副如果订阅者 B 比订阅者 A 快,则相反。
当然,这是在单线程模式下自然会发生的情况,但是这样就失去了由多个订阅者并行处理同一事件的能力,这意味着我无法轻松利用我的系统。
更新
感谢@Jonas Chapuis 使用 Sort()
的回答。
但是,在这种特殊情况下,我的目标是阻止快速订阅者在消费事件时领先于慢速订阅者,即我需要某种锁或门,以便快速订阅者等待所有慢速订阅者已经完成了事件,然后才进入下一个事件。
换句话说,我希望所有订阅者在事件中同步移动,没有单个订阅者领先于其他订阅者。 RX 事件将在单个线程上创建,因此它们永远不会乱序。
更新
几个月后,事实证明我使用了错误的体系结构,这是一个错误的问题。
我应该观察 EventLoopScheduler
,而不是观察 ThreadPoolScheduler.Instance
,它将所有订阅锁定到单个线程。这样可以保留顺序。
为了获得时间序列数据的并行性,最好将数据处理分为多个阶段的管道,每个线程集中在一个管道阶段。这样处理起来就简单多了,而且满足上面所有的约束。
据我所知,您正试图在保持元素顺序的同时最大化处理吞吐量。这是 James World 在 Reordering events with Reactive Extensions.
中描述的基于自定义 Sort()
运算符的解决方案
并行处理元素意味着失去顺序。为了恢复原始顺序,我们使用 Sort()
运算符(在幕后,该运算符主要根据传递的密钥生成器函数执行缓冲和释放值)。
var random = new Random();
var xs = Observable.Range(0, 10);
xs.SelectMany((index, value) => Observable.Start(() =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(random.Next(0, 1000)));
Console.WriteLine("Thread {0}: processing value {1}.",
Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(2), value);
return new {Index = index, Value = value};
}, ThreadPoolScheduler.Instance))
.Sort(el => el.Index, 0, i => i + 1)
.Subscribe(el => Console.WriteLine(el.Value));
这将产生如下输出:
Thread 15: processing value 7.
Thread 10: processing value 0.
0
Thread 16: processing value 5.
Thread 11: processing value 1.
1
Thread 15: processing value 8.
Thread 14: processing value 6.
Thread 13: processing value 2.
2
Thread 10: processing value 9.
Thread 17: processing value 4.
Thread 12: processing value 3.
3
4
5
6
7
8
9
好的,我确实没有完全理解您的要求,对此感到抱歉。下面你会发现一种不同的方法,它依赖于订阅者通过专门的主题发出他们已经完成的信号。然后将这些主题压缩在一起:这为您提供了 "lock" 语义(请注意 Zip
运算符的重载最多支持 16 个源)。
var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
var subscriberADone = new Subject<Unit>();
var subscriberBDone = new Subject<Unit>();
var bothSubscribersDone = subscriberADone.Zip(subscriberBDone, (_, __) => Unit.Default);
var lockStepInput = rx.Zip(bothSubscribersDone.StartWith(Unit.Default), (i, _) => i);
lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Fast Subscriber A. Takes 20 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(20));
Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
subscriberADone.OnNext(Unit.Default);
});
lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Slow Subscriber B. Takes 500 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
subscriberBDone.OnNext(Unit.Default);
});
for (int j = 0; j < 5; j++)
{
int j1 = j;
rx.OnNext(j1);
Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}
这生成了以下输出:
Push: 0 (thread 9)
Push: 1 (thread 9)
Push: 2 (thread 9)
Push: 3 (thread 9)
Push: 4 (thread 9)
Subscriber A: 0 (thread 10). Time: 111 milliseconds.
Subscriber B: 0 (thread 11). Time: 591 milliseconds.
Subscriber A: 1 (thread 10). Time: 611 milliseconds.
Subscriber B: 1 (thread 11). Time: 1091 milliseconds.
Subscriber A: 2 (thread 10). Time: 1111 milliseconds.
Subscriber B: 2 (thread 11). Time: 1591 milliseconds.
Subscriber A: 3 (thread 10). Time: 1611 milliseconds.
Subscriber B: 3 (thread 11). Time: 2091 milliseconds.
Subscriber A: 4 (thread 10). Time: 2111 milliseconds.
Subscriber B: 4 (thread 11). Time: 2591 milliseconds.
我希望能够在多个核心上处理事件流,但要保持所有内容同步,以便所有订阅者以锁定步骤处理事件,因此没有任何一个订阅者会领先于任何其他订阅者。
换句话说,我希望快速订阅者等到所有其他慢速订阅者完成每个事件,然后再进行下一个事件。每个订阅者都有一个过滤器,因此它只处理它感兴趣的事件。
如果这可行,我可以轻松利用系统中的所有内核,而不会 运行 陷入太多的多线程或同步问题。
例子
假设我们有一个在单个线程上生成的 RX 事件流。我们有两个 RX 订阅者,A
和 B
。我们有这些限制:
- 每个 RX 事件必须由所有订阅者以锁定步骤处理,即事件
j=2
不会被订阅者B
处理,直到事件j=1
已被所有订阅者A
和B
完全处理,事件j=3
将不会被订阅者 B 处理,直到事件j=2
已被所有订阅者完全处理A
和B
等 - 并行处理每个 RX 事件,即订阅者
A
可以与订阅者B
并行处理事件j=1
处理事件j=1
,等等 - 顺序不变性,即所有订阅者都按照事件创建的顺序接收事件,因此事件
j=0
将始终进行j=1
,事件j=1
将始终进行j=2
,等等。如果在单个线程上推送事件,则会自动发生这种情况,因此已经满足此约束条件。
到目前为止我有什么
我试过很多Synchronize
的组合,结合下面的代码:
var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
rx.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Fast Subscriber A. Takes 20 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(20));
Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});
rx.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Slow Subscriber B. Takes 500 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});
for (int j = 0; j < 5; j++)
{
int j1 = j;
rx.OnNext(j1);
Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}
程序的当前输出
Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 288 milliseconds.
Subscriber A: j=1 (thread 10). Time: 308 milliseconds.
Subscriber A: j=2 (thread 10). Time: 328 milliseconds.
Subscriber A: j=3 (thread 10). Time: 348 milliseconds.
Subscriber A: j=4 (thread 10). Time: 368 milliseconds.
Subscriber B: j=0 (thread 11). Time: 768 milliseconds.
Subscriber B: j=1 (thread 11). Time: 1268 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1768 milliseconds.
Subscriber B: j=3 (thread 11). Time: 2268 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2768 milliseconds.
程序的期望输出
Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 000 milliseconds.
Subscriber B: j=0 (thread 11). Time: 000 milliseconds.
Subscriber A: j=1 (thread 10). Time: 500 milliseconds.
Subscriber B: j=1 (thread 11). Time: 500 milliseconds.
Subscriber A: j=2 (thread 10). Time: 1000 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1000 milliseconds.
Subscriber A: j=3 (thread 10). Time: 1500 milliseconds.
Subscriber B: j=3 (thread 11). Time: 1500 milliseconds.
Subscriber A: j=4 (thread 10). Time: 2000 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2000 milliseconds.
本质上,我希望所有订阅者并行处理事件 j=0
,然后所有订阅者并行处理事件 j=1
,等等,即使一些订阅者比其他订阅者慢。在这种情况下,订阅者 A 很快(20 毫秒)而订阅者 B 很慢(500 毫秒),因此我们需要某种锁或门,以便订阅者 A 等待订阅者 B 完成后再继续下一个事件,或者副如果订阅者 B 比订阅者 A 快,则相反。
当然,这是在单线程模式下自然会发生的情况,但是这样就失去了由多个订阅者并行处理同一事件的能力,这意味着我无法轻松利用我的系统。
更新
感谢@Jonas Chapuis 使用 Sort()
的回答。
但是,在这种特殊情况下,我的目标是阻止快速订阅者在消费事件时领先于慢速订阅者,即我需要某种锁或门,以便快速订阅者等待所有慢速订阅者已经完成了事件,然后才进入下一个事件。
换句话说,我希望所有订阅者在事件中同步移动,没有单个订阅者领先于其他订阅者。 RX 事件将在单个线程上创建,因此它们永远不会乱序。
更新
几个月后,事实证明我使用了错误的体系结构,这是一个错误的问题。
我应该观察 EventLoopScheduler
,而不是观察 ThreadPoolScheduler.Instance
,它将所有订阅锁定到单个线程。这样可以保留顺序。
为了获得时间序列数据的并行性,最好将数据处理分为多个阶段的管道,每个线程集中在一个管道阶段。这样处理起来就简单多了,而且满足上面所有的约束。
据我所知,您正试图在保持元素顺序的同时最大化处理吞吐量。这是 James World 在 Reordering events with Reactive Extensions.
中描述的基于自定义Sort()
运算符的解决方案
并行处理元素意味着失去顺序。为了恢复原始顺序,我们使用 Sort()
运算符(在幕后,该运算符主要根据传递的密钥生成器函数执行缓冲和释放值)。
var random = new Random();
var xs = Observable.Range(0, 10);
xs.SelectMany((index, value) => Observable.Start(() =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(random.Next(0, 1000)));
Console.WriteLine("Thread {0}: processing value {1}.",
Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(2), value);
return new {Index = index, Value = value};
}, ThreadPoolScheduler.Instance))
.Sort(el => el.Index, 0, i => i + 1)
.Subscribe(el => Console.WriteLine(el.Value));
这将产生如下输出:
Thread 15: processing value 7.
Thread 10: processing value 0.
0
Thread 16: processing value 5.
Thread 11: processing value 1.
1
Thread 15: processing value 8.
Thread 14: processing value 6.
Thread 13: processing value 2.
2
Thread 10: processing value 9.
Thread 17: processing value 4.
Thread 12: processing value 3.
3
4
5
6
7
8
9
好的,我确实没有完全理解您的要求,对此感到抱歉。下面你会发现一种不同的方法,它依赖于订阅者通过专门的主题发出他们已经完成的信号。然后将这些主题压缩在一起:这为您提供了 "lock" 语义(请注意 Zip
运算符的重载最多支持 16 个源)。
var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
var subscriberADone = new Subject<Unit>();
var subscriberBDone = new Subject<Unit>();
var bothSubscribersDone = subscriberADone.Zip(subscriberBDone, (_, __) => Unit.Default);
var lockStepInput = rx.Zip(bothSubscribersDone.StartWith(Unit.Default), (i, _) => i);
lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Fast Subscriber A. Takes 20 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(20));
Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
subscriberADone.OnNext(Unit.Default);
});
lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Slow Subscriber B. Takes 500 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
subscriberBDone.OnNext(Unit.Default);
});
for (int j = 0; j < 5; j++)
{
int j1 = j;
rx.OnNext(j1);
Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}
这生成了以下输出:
Push: 0 (thread 9)
Push: 1 (thread 9)
Push: 2 (thread 9)
Push: 3 (thread 9)
Push: 4 (thread 9)
Subscriber A: 0 (thread 10). Time: 111 milliseconds.
Subscriber B: 0 (thread 11). Time: 591 milliseconds.
Subscriber A: 1 (thread 10). Time: 611 milliseconds.
Subscriber B: 1 (thread 11). Time: 1091 milliseconds.
Subscriber A: 2 (thread 10). Time: 1111 milliseconds.
Subscriber B: 2 (thread 11). Time: 1591 milliseconds.
Subscriber A: 3 (thread 10). Time: 1611 milliseconds.
Subscriber B: 3 (thread 11). Time: 2091 milliseconds.
Subscriber A: 4 (thread 10). Time: 2111 milliseconds.
Subscriber B: 4 (thread 11). Time: 2591 milliseconds.