使用 RX 压缩重复流
Zipping a repeated stream with RX
我有两个热门的可观察对象,我不想错过任何通知
第一个 observable 产生数字
1-2-3-4
和第二个字符串
a-b
我正在寻找一种压缩它们以生成下一个输出的方法
a-1 b-2 a-3 b-4 a-5
所以我想要的是无限期地重复第二个流,直到取消订阅第一个流。我尝试了类似下一个测试的方法,它产生了所需的输出。但是我不会 Repeat
永远,因为我不知道如何停止以测试结果。此外,我使用了手动完成的 ReplaySubject
,这意味着我无法收到任何新通知。
class MyClass:ReactiveTest{
[Fact]
public void MethodName4() {
var strings = new ReplaySubject<string>();
var testScheduler = new TestScheduler();
testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
strings.OnNext("a");
strings.OnNext("b");
strings.OnCompleted();
});
var numbers = testScheduler.CreateHotObservable(OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5));
numbers.Zip(Observable.Defer(() => strings).Repeat(3), (i, s) => (i, s)).Subscribe();
testScheduler.AdvanceBy(500);
}
}
我想我找到了一个我将尝试描述的解决方案。
重复一个热的 Observable 是没有意义的,因为它永远不会完成。但是,您可以重复此热可观察对象的 window。我从 @Enigmativity 重构了这个 answer 以重复所有值而不是最后一个值。
public static IObservable<T> RepeatAllDuringSilence<T>(this IObservable<T> source, TimeSpan maxQuietPeriod,
IScheduler scheduler=null, IObservable<T> inner=null) {
if (scheduler==null)
scheduler=Scheduler.Default;
if (inner == null)
inner = source;
return Observable.Create<T>(observer => {
var replay = inner.Replay(scheduler);
var sequence = source.Select(x => Observable.Interval(maxQuietPeriod, scheduler).SelectMany(l => replay).StartWith(x)).Switch();
return new CompositeDisposable(replay.Connect(), sequence.Subscribe(observer));
});
}
然后在我的测试中使用此方法,我设法在没有完成 ReplaySubject
或丢失未来通知的情况下获得预期结果,因为我更改了原始可观察对象的温度。
class MyClass : ReactiveTest {
[Fact]
public void MethodName4() {
var strings = new Subject<string>();
var testScheduler = new TestScheduler();
testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
strings.OnNext("a");
strings.OnNext("b");
});
testScheduler.Schedule("", TimeSpan.FromTicks(20), (scheduler, s) => {
strings.OnNext("c");
});
var numbers = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5));
numbers.Zip(strings.RepeatAllDuringSilence(TimeSpan.FromTicks(10),testScheduler), (i, s) => (i, s)).Subscribe(tuple => WriteLine(tuple));
testScheduler.AdvanceBy(50);
}
}
(1, a)
(2, b)
(3, c)
(4, a)
(5, b)
这不能解决您的问题吗? :
void TestMergedPairs()
{
var source1 = Observable.Range(1,9999);
var source2 = Observable.Range(65, 2).Repeat();
var z = source1.Zip(source2,(x,y)=> { return new { left = x, right = y }; })
.TakeWhile((v)=>v.left<1000);
z.Subscribe(x => { Console.WriteLine( x.left +" "+x.right ); });
}
如果不是,那么不清楚你的问题是什么。如果您希望能够继续重复第二个可观察对象的历史内容,那么您的 "a","b" 稍后可能会变成 "a","b","c" 然后应该迭代 3 个字母序列,那么也许您的来源可能是聚合历史 windows.
的 .Scan 的结果
我有两个热门的可观察对象,我不想错过任何通知
第一个 observable 产生数字
1-2-3-4
和第二个字符串
a-b
我正在寻找一种压缩它们以生成下一个输出的方法
a-1 b-2 a-3 b-4 a-5
所以我想要的是无限期地重复第二个流,直到取消订阅第一个流。我尝试了类似下一个测试的方法,它产生了所需的输出。但是我不会 Repeat
永远,因为我不知道如何停止以测试结果。此外,我使用了手动完成的 ReplaySubject
,这意味着我无法收到任何新通知。
class MyClass:ReactiveTest{
[Fact]
public void MethodName4() {
var strings = new ReplaySubject<string>();
var testScheduler = new TestScheduler();
testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
strings.OnNext("a");
strings.OnNext("b");
strings.OnCompleted();
});
var numbers = testScheduler.CreateHotObservable(OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5));
numbers.Zip(Observable.Defer(() => strings).Repeat(3), (i, s) => (i, s)).Subscribe();
testScheduler.AdvanceBy(500);
}
}
我想我找到了一个我将尝试描述的解决方案。
重复一个热的 Observable 是没有意义的,因为它永远不会完成。但是,您可以重复此热可观察对象的 window。我从 @Enigmativity 重构了这个 answer 以重复所有值而不是最后一个值。
public static IObservable<T> RepeatAllDuringSilence<T>(this IObservable<T> source, TimeSpan maxQuietPeriod,
IScheduler scheduler=null, IObservable<T> inner=null) {
if (scheduler==null)
scheduler=Scheduler.Default;
if (inner == null)
inner = source;
return Observable.Create<T>(observer => {
var replay = inner.Replay(scheduler);
var sequence = source.Select(x => Observable.Interval(maxQuietPeriod, scheduler).SelectMany(l => replay).StartWith(x)).Switch();
return new CompositeDisposable(replay.Connect(), sequence.Subscribe(observer));
});
}
然后在我的测试中使用此方法,我设法在没有完成 ReplaySubject
或丢失未来通知的情况下获得预期结果,因为我更改了原始可观察对象的温度。
class MyClass : ReactiveTest {
[Fact]
public void MethodName4() {
var strings = new Subject<string>();
var testScheduler = new TestScheduler();
testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
strings.OnNext("a");
strings.OnNext("b");
});
testScheduler.Schedule("", TimeSpan.FromTicks(20), (scheduler, s) => {
strings.OnNext("c");
});
var numbers = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5));
numbers.Zip(strings.RepeatAllDuringSilence(TimeSpan.FromTicks(10),testScheduler), (i, s) => (i, s)).Subscribe(tuple => WriteLine(tuple));
testScheduler.AdvanceBy(50);
}
}
(1, a)
(2, b)
(3, c)
(4, a)
(5, b)
这不能解决您的问题吗? :
void TestMergedPairs()
{
var source1 = Observable.Range(1,9999);
var source2 = Observable.Range(65, 2).Repeat();
var z = source1.Zip(source2,(x,y)=> { return new { left = x, right = y }; })
.TakeWhile((v)=>v.left<1000);
z.Subscribe(x => { Console.WriteLine( x.left +" "+x.right ); });
}
如果不是,那么不清楚你的问题是什么。如果您希望能够继续重复第二个可观察对象的历史内容,那么您的 "a","b" 稍后可能会变成 "a","b","c" 然后应该迭代 3 个字母序列,那么也许您的来源可能是聚合历史 windows.
的 .Scan 的结果