合并两个 IObservable 时的意外行为

Unexpected behaviour when merging two IObservables

我最近的"Play With Rx"-project中有以下情况:

class Program
{
    static void Main(string[] args)
    {
        var observable1 = Observable.Create<int>(
               (Func<IObserver<int>, IDisposable>)GenerateSequence);
        var observable2 = Observable.Create<int>(
               (Func<IObserver<int>, IDisposable>)GenerateSequence);
        var merged = observable1.Merge(observable2);

        observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
        observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
        merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));

        Console.ReadLine();
    }

    private static int count = 0;

    private static IDisposable GenerateSequence(IObserver<int> observer)
    {
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (true)
            {
                observer.OnNext(count++);
                Thread.Sleep(500);
            }
        });
        return Disposable.Empty;
    }
}

现在,我希望看到类似

的内容
1: 0
2: 1
Merged: 0
Merged: 1
1: 2
2: 3
Merged: 2
Merged: 3

相反,我看到了

1: 0
2: 1
Merged: 2
Merged: 3
1: 4
2: 5
Merged: 6
Merged: 7

如果我用

替换循环
while (true)
{
    observer.OnNext(r.Next(0, 1000));
    Thread.Sleep(500);
}

对于 Random 的静态或本地实例 r,合并后的序列中有两个单独序列中的其他数字!

我不明白 count++r.Next(0, 1000) 如何可以从 observer.OnNext(...) 的一次调用中多次执行。 Merge 怎么办我不明白?

P.S.: 我曾尝试通过锁或分隔两个线程的循环时间来消除竞争条件,但结果没有改变,所以我将这些尝试排除在外。

编辑: 似乎 GenerateSequence 被调用了 4 次,因此 4 个线程被旋转以增加 count。虽然这解释了我所看到的,但我不明白为什么会这样。

  • 当您订阅 observable1 时,您订阅 Observable.Create(GenerateSequence),它会调用 GenerateSequence 并开始循环。
  • 当您订阅 observable2 时,您订阅了 Observable.Create(GenerateSequence),它会调用 GenerateSequence 并开始循环。
  • 当您订阅 merged 时,您订阅了 Observable.Merge(observable1, observable2),后者订阅了 observable1observable2。我们在前两点中看到了当您执行其中每一项时会发生什么。

最终结果是对 GenerateSequence.

的四次调用

要获得与您正在寻找的效果非常接近的效果,您需要查看 Publish():

var observable1 = Observable
    .Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
    .Publish();
var observable2 = Observable
    .Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
    .Publish();
var merged = observable1.Merge(observable2);

observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));

observable1.Connect();
observable2.Connect();

observable1observable2 现在属于 IConnectableObservable 类型,这意味着它们推迟订阅其基础 IObservable(在您的情况下为 Observable.Create)直到他们 Connect 打电话。

输出

1: 0
Merged: 0
2: 1
Merged: 1
1: 2
Merged: 2
2: 3
Merged: 3
...