为什么 ReplySubject 在订阅具有相同序列的不同 observable 时会有不同的行为?
Why ReplySubject has different behavior when subscribe to different observable with same sequence?
示例 1 通过 Observable.Interval 创建可观察对象:
var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
var replaySubject1 = new ReplaySubject<long>();
observable1.Subscribe(replaySubject1); // subscribe
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));
输出结果如下:
first:0
second:0
third:0
first:1
second:1
third:1
first:2
second:2
third:2
Example2 通过 Observable.Create :
创建 observable
var observable2 = Observable.Create<long>(observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
Thread.Sleep(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
输出结果如下:
first:0
first:1
first:2
second:0
second:1
second:2
third:0
third:1
third:2
假设这些示例具有相同的输出,但是,我错了,为什么?
这是因为 Observable.Create 中的代码在您将 ReplaySubject 订阅到 Observable 时同步执行。
试试这个异步版本:
var observable2 = Observable.Create<long>(async observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));
或者看看 SubscribeOn/ObserveOn。
示例 1 通过 Observable.Interval 创建可观察对象:
var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
var replaySubject1 = new ReplaySubject<long>();
observable1.Subscribe(replaySubject1); // subscribe
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));
输出结果如下:
first:0
second:0
third:0
first:1
second:1
third:1
first:2
second:2
third:2
Example2 通过 Observable.Create :
创建 observable var observable2 = Observable.Create<long>(observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
Thread.Sleep(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
输出结果如下:
first:0
first:1
first:2
second:0
second:1
second:2
third:0
third:1
third:2
假设这些示例具有相同的输出,但是,我错了,为什么?
这是因为 Observable.Create 中的代码在您将 ReplaySubject 订阅到 Observable 时同步执行。
试试这个异步版本:
var observable2 = Observable.Create<long>(async observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));
或者看看 SubscribeOn/ObserveOn。