合并两个 IObservable 时的意外行为
Unexpected behaviour when merging two IObservables
我最近的"Play With Rx"-project中有以下情况:
class Program
{
static void Main(string[] args)
{
var observable1 = Observable.Create<int>(
(Func<IObserver<int>, IDisposable>)GenerateSequence);
var observable2 = Observable.Create<int>(
(Func<IObserver<int>, IDisposable>)GenerateSequence);
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
Console.ReadLine();
}
private static int count = 0;
private static IDisposable GenerateSequence(IObserver<int> observer)
{
ThreadPool.QueueUserWorkItem((o) =>
{
while (true)
{
observer.OnNext(count++);
Thread.Sleep(500);
}
});
return Disposable.Empty;
}
}
现在,我希望看到类似
的内容
1: 0
2: 1
Merged: 0
Merged: 1
1: 2
2: 3
Merged: 2
Merged: 3
相反,我看到了
1: 0
2: 1
Merged: 2
Merged: 3
1: 4
2: 5
Merged: 6
Merged: 7
如果我用
替换循环
while (true)
{
observer.OnNext(r.Next(0, 1000));
Thread.Sleep(500);
}
对于 Random 的静态或本地实例 r,合并后的序列中有两个单独序列中的其他数字!
我不明白 count++
或 r.Next(0, 1000)
如何可以从 observer.OnNext(...)
的一次调用中多次执行。 Merge 怎么办我不明白?
P.S.: 我曾尝试通过锁或分隔两个线程的循环时间来消除竞争条件,但结果没有改变,所以我将这些尝试排除在外。
编辑: 似乎 GenerateSequence
被调用了 4 次,因此 4 个线程被旋转以增加 count
。虽然这解释了我所看到的,但我不明白为什么会这样。
- 当您订阅
observable1
时,您订阅 Observable.Create(GenerateSequence)
,它会调用 GenerateSequence
并开始循环。
- 当您订阅
observable2
时,您订阅了 Observable.Create(GenerateSequence)
,它会调用 GenerateSequence
并开始循环。
- 当您订阅
merged
时,您订阅了 Observable.Merge(observable1, observable2)
,后者订阅了 observable1
和 observable2
。我们在前两点中看到了当您执行其中每一项时会发生什么。
最终结果是对 GenerateSequence
.
的四次调用
要获得与您正在寻找的效果非常接近的效果,您需要查看 Publish()
:
var observable1 = Observable
.Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
.Publish();
var observable2 = Observable
.Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
.Publish();
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
observable1.Connect();
observable2.Connect();
observable1
和 observable2
现在属于 IConnectableObservable
类型,这意味着它们推迟订阅其基础 IObservable
(在您的情况下为 Observable.Create
)直到他们 Connect
打电话。
输出
1: 0
Merged: 0
2: 1
Merged: 1
1: 2
Merged: 2
2: 3
Merged: 3
...
我最近的"Play With Rx"-project中有以下情况:
class Program
{
static void Main(string[] args)
{
var observable1 = Observable.Create<int>(
(Func<IObserver<int>, IDisposable>)GenerateSequence);
var observable2 = Observable.Create<int>(
(Func<IObserver<int>, IDisposable>)GenerateSequence);
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
Console.ReadLine();
}
private static int count = 0;
private static IDisposable GenerateSequence(IObserver<int> observer)
{
ThreadPool.QueueUserWorkItem((o) =>
{
while (true)
{
observer.OnNext(count++);
Thread.Sleep(500);
}
});
return Disposable.Empty;
}
}
现在,我希望看到类似
的内容1: 0
2: 1
Merged: 0
Merged: 1
1: 2
2: 3
Merged: 2
Merged: 3
相反,我看到了
1: 0
2: 1
Merged: 2
Merged: 3
1: 4
2: 5
Merged: 6
Merged: 7
如果我用
替换循环while (true)
{
observer.OnNext(r.Next(0, 1000));
Thread.Sleep(500);
}
对于 Random 的静态或本地实例 r,合并后的序列中有两个单独序列中的其他数字!
我不明白 count++
或 r.Next(0, 1000)
如何可以从 observer.OnNext(...)
的一次调用中多次执行。 Merge 怎么办我不明白?
P.S.: 我曾尝试通过锁或分隔两个线程的循环时间来消除竞争条件,但结果没有改变,所以我将这些尝试排除在外。
编辑: 似乎 GenerateSequence
被调用了 4 次,因此 4 个线程被旋转以增加 count
。虽然这解释了我所看到的,但我不明白为什么会这样。
- 当您订阅
observable1
时,您订阅Observable.Create(GenerateSequence)
,它会调用GenerateSequence
并开始循环。 - 当您订阅
observable2
时,您订阅了Observable.Create(GenerateSequence)
,它会调用GenerateSequence
并开始循环。 - 当您订阅
merged
时,您订阅了Observable.Merge(observable1, observable2)
,后者订阅了observable1
和observable2
。我们在前两点中看到了当您执行其中每一项时会发生什么。
最终结果是对 GenerateSequence
.
要获得与您正在寻找的效果非常接近的效果,您需要查看 Publish()
:
var observable1 = Observable
.Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
.Publish();
var observable2 = Observable
.Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
.Publish();
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
observable1.Connect();
observable2.Connect();
observable1
和 observable2
现在属于 IConnectableObservable
类型,这意味着它们推迟订阅其基础 IObservable
(在您的情况下为 Observable.Create
)直到他们 Connect
打电话。
输出
1: 0
Merged: 0
2: 1
Merged: 1
1: 2
Merged: 2
2: 3
Merged: 3
...