并行处理 Rx 事件
Parallel handling Rx events
如果我说
void ChangeQuote(string symbol, double bid, double ask)
{
}
string[] TechETF = {"AAPL", "MSFT"};
var quotesSubscription = quotesObservable.Where(quote => (TechETF.Contains(quote.Symbol)));
quotesSubscription.Subscribe(quote => this.ChangeQuote(quote.Symbol, quote.Bid, quote.Ask));
如果底层事件是异步触发的,subscribe()
中的这段代码是否也在它们自己的线程(线程池)上异步处理这些回调?或者此代码是否序列化(阻止)这些回调的处理?
Rx 总是序列化调用。这是行为契约的一部分。
这里有一个例子来说明这种情况。
从两个计时器开始:
var timers = new []
{
new System.Timers.Timer()
{
Interval = 200.0,
AutoReset = true,
Enabled = true,
},
new System.Timers.Timer()
{
Interval = 250.0,
AutoReset = true,
Enabled = true,
},
};
现在从计时器构建两个可观察对象:
var observables = new []
{
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[0].Elapsed += h, h => timers[0].Elapsed -= h),
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[1].Elapsed += h, h => timers[1].Elapsed -= h),
};
然后将它们合并成一个单独的可观察对象:
var observable = observables.Merge();
最后订阅看看会发生什么:
observable
.Subscribe(ep =>
{
Console.WriteLine("Start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
Console.WriteLine("End: " + Thread.CurrentThread.ManagedThreadId);
});
我得到的结果是这样的:
Start: 15
End: 15
Start: 11
End: 11
Start: 14
End: 14
Start: 10
End: 10
Start: 4
End: 4
Start: 13
End: 13
Start: 12
End: 12
Start: 3
End: 3
因此,即使我的计时器彼此异步触发,Rx 仍确保单个订阅始终连续处理每个值,即使它们来自不同的线程。
如果您有两个并行处理同一个 Observable 运行ning 的 OnNext 处理程序,您将无法再保证 Observable 提供的值按照它们生成的顺序进行处理。正如 Enigmativity 所说,这是 Rx 合约的一部分。出于这个原因,正如您在 Enigmativity 的示例中看到的那样,Rx 序列化了通知。
将 ObserveOn 与 Threadpool 或 NewThread 调度程序一起使用并不意味着每个通知都可以 运行 并行。这意味着通知处理代码可以 运行 在与通知生成者不同的上下文中串联。
如果你真的想并行处理通知,并接受竞争条件的后果,那么你总是可以在通知处理程序中生成新的线程池任务或类似任务。通常这不是您真正想要做的。
如果我说
void ChangeQuote(string symbol, double bid, double ask)
{
}
string[] TechETF = {"AAPL", "MSFT"};
var quotesSubscription = quotesObservable.Where(quote => (TechETF.Contains(quote.Symbol)));
quotesSubscription.Subscribe(quote => this.ChangeQuote(quote.Symbol, quote.Bid, quote.Ask));
如果底层事件是异步触发的,subscribe()
中的这段代码是否也在它们自己的线程(线程池)上异步处理这些回调?或者此代码是否序列化(阻止)这些回调的处理?
Rx 总是序列化调用。这是行为契约的一部分。
这里有一个例子来说明这种情况。
从两个计时器开始:
var timers = new []
{
new System.Timers.Timer()
{
Interval = 200.0,
AutoReset = true,
Enabled = true,
},
new System.Timers.Timer()
{
Interval = 250.0,
AutoReset = true,
Enabled = true,
},
};
现在从计时器构建两个可观察对象:
var observables = new []
{
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[0].Elapsed += h, h => timers[0].Elapsed -= h),
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[1].Elapsed += h, h => timers[1].Elapsed -= h),
};
然后将它们合并成一个单独的可观察对象:
var observable = observables.Merge();
最后订阅看看会发生什么:
observable
.Subscribe(ep =>
{
Console.WriteLine("Start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
Console.WriteLine("End: " + Thread.CurrentThread.ManagedThreadId);
});
我得到的结果是这样的:
Start: 15
End: 15
Start: 11
End: 11
Start: 14
End: 14
Start: 10
End: 10
Start: 4
End: 4
Start: 13
End: 13
Start: 12
End: 12
Start: 3
End: 3
因此,即使我的计时器彼此异步触发,Rx 仍确保单个订阅始终连续处理每个值,即使它们来自不同的线程。
如果您有两个并行处理同一个 Observable 运行ning 的 OnNext 处理程序,您将无法再保证 Observable 提供的值按照它们生成的顺序进行处理。正如 Enigmativity 所说,这是 Rx 合约的一部分。出于这个原因,正如您在 Enigmativity 的示例中看到的那样,Rx 序列化了通知。
将 ObserveOn 与 Threadpool 或 NewThread 调度程序一起使用并不意味着每个通知都可以 运行 并行。这意味着通知处理代码可以 运行 在与通知生成者不同的上下文中串联。
如果你真的想并行处理通知,并接受竞争条件的后果,那么你总是可以在通知处理程序中生成新的线程池任务或类似任务。通常这不是您真正想要做的。