并行处理 Rx 事件

Parallel handling Rx events

如果我说

void ChangeQuote(string symbol, double bid, double ask)
{
}

string[] TechETF = {"AAPL", "MSFT"};
var quotesSubscription = quotesObservable.Where(quote => (TechETF.Contains(quote.Symbol)));

quotesSubscription.Subscribe(quote => this.ChangeQuote(quote.Symbol, quote.Bid, quote.Ask));

如果底层事件是异步触发的,subscribe() 中的这段代码是否也在它们自己的线程(线程池)上异步处理这些回调?或者此代码是否序列化(阻止)这些回调的处理?

Rx 总是序列化调用。这是行为契约的一部分。

这里有一个例子来说明这种情况。

从两个计时器开始:

var timers = new []
{
    new System.Timers.Timer()
    {
        Interval = 200.0,
        AutoReset = true,
        Enabled = true,
    },
    new System.Timers.Timer()
    {
        Interval = 250.0,
        AutoReset = true,
        Enabled = true,
    },
};

现在从计时器构建两个可观察对象:

var observables = new []
{
    Observable.FromEventPattern
        <System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
        (h => timers[0].Elapsed += h, h => timers[0].Elapsed -= h),
    Observable.FromEventPattern
        <System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
        (h => timers[1].Elapsed += h, h => timers[1].Elapsed -= h),
};

然后将它们合并成一个单独的可观察对象:

var observable = observables.Merge();

最后订阅看看会发生什么:

observable
    .Subscribe(ep =>
    {
        Console.WriteLine("Start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
        Console.WriteLine("End: " + Thread.CurrentThread.ManagedThreadId);
    });

我得到的结果是这样的:

Start: 15
End: 15
Start: 11
End: 11
Start: 14
End: 14
Start: 10
End: 10
Start: 4
End: 4
Start: 13
End: 13
Start: 12
End: 12
Start: 3
End: 3

因此,即使我的计时器彼此异步触发,Rx 仍确保单个订阅始终连续处理每个值,即使它们来自不同的线程。

如果您有两个并行处理同一个 Observable 运行ning 的 OnNext 处理程序,您将无法再保证 Observable 提供的值按照它们生成的顺序进行处理。正如 Enigmativity 所说,这是 Rx 合约的一部分。出于这个原因,正如您在 Enigmativity 的示例中看到的那样,Rx 序列化了通知。

将 ObserveOn 与 Threadpool 或 NewThread 调度程序一起使用并不意味着每个通知都可以 运行 并行。这意味着通知处理代码可以 运行 在与通知生成者不同的上下文中串联。

如果你真的想并行处理通知,并接受竞争条件的后果,那么你总是可以在通知处理程序中生成新的线程池任务或类似任务。通常这不是您真正想要做的。