使用 Reactive Extensions 动态连接序列
Dynamically concatenating sequences with Reactive Extensions
我想创建一个连接一个或多个动态创建的序列(在 运行 时间内)的序列。
我尝试使用 mySequence = mySequence.Concat(anotherSequence)
,但这会中断当前对 mySequence
的订阅,因为每次都会创建一个新序列。
当您将一个可观察序列连接到另一个序列时,第一个序列必须在您从第二个序列获取任何值之前结束。这听起来更像是您想合并两个或更多序列 - 换句话说,只要该序列产生值,就从该序列获取值。
所以,如果您允许我将 .Concat
更改为 .Merge
,听起来您现在的代码是这样的:
IObservable<long> mySequence = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5);
IDisposable mySequenceSubscription = mySequence.Subscribe(n => Console.WriteLine(n));
IObservable<long> anotherSequence = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5);
mySequence = mySequence.Merge(anotherSequence);
如果我 运行 我得到这些值:
0
1
2
3
4
第二个序列没有合并。
现在,如果您在创建订阅时不知道要合并的未来可观察对象是什么,那么您可以这样做:
Subject<IObservable<long>> sources = new Subject<System.IObservable<long>>();
IDisposable sourceSubscription = sources.Merge().Subscribe(n => Console.WriteLine(n));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5));
现在的结果是这样的:
0
1
0
2
3
4
1
2
3
4
这已正确合并两个在订阅完成后添加的可观察对象。简单。
我想创建一个连接一个或多个动态创建的序列(在 运行 时间内)的序列。
我尝试使用 mySequence = mySequence.Concat(anotherSequence)
,但这会中断当前对 mySequence
的订阅,因为每次都会创建一个新序列。
当您将一个可观察序列连接到另一个序列时,第一个序列必须在您从第二个序列获取任何值之前结束。这听起来更像是您想合并两个或更多序列 - 换句话说,只要该序列产生值,就从该序列获取值。
所以,如果您允许我将 .Concat
更改为 .Merge
,听起来您现在的代码是这样的:
IObservable<long> mySequence = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5);
IDisposable mySequenceSubscription = mySequence.Subscribe(n => Console.WriteLine(n));
IObservable<long> anotherSequence = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5);
mySequence = mySequence.Merge(anotherSequence);
如果我 运行 我得到这些值:
0 1 2 3 4
第二个序列没有合并。
现在,如果您在创建订阅时不知道要合并的未来可观察对象是什么,那么您可以这样做:
Subject<IObservable<long>> sources = new Subject<System.IObservable<long>>();
IDisposable sourceSubscription = sources.Merge().Subscribe(n => Console.WriteLine(n));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5));
现在的结果是这样的:
0 1 0 2 3 4 1 2 3 4
这已正确合并两个在订阅完成后添加的可观察对象。简单。