使用 RX 压缩重复流

Zipping a repeated stream with RX

我有两个热门的可观察对象,我不想错过任何通知

第一个 observable 产生数字

1-2-3-4

和第二个字符串

a-b

我正在寻找一种压缩它们以生成下一个输出的方法

a-1 b-2 a-3 b-4 a-5

所以我想要的是无限期地重复第二个流,直到取消订阅第一个流。我尝试了类似下一个测试的方法,它产生了所需的输出。但是我不会 Repeat 永远,因为我不知道如何停止以测试结果。此外,我使用了手动完成的 ReplaySubject,这意味着我无法收到任何新通知。

    class MyClass:ReactiveTest{
        [Fact]
        public void MethodName4() {
            var strings = new ReplaySubject<string>();
            var testScheduler = new TestScheduler();
            testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
                strings.OnNext("a");
                strings.OnNext("b");
                strings.OnCompleted();
            });

            var numbers = testScheduler.CreateHotObservable(OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5));
            numbers.Zip(Observable.Defer(() => strings).Repeat(3), (i, s) => (i, s)).Subscribe();
            testScheduler.AdvanceBy(500);
        }

    }

我想我找到了一个我将尝试描述的解决方案。

重复一个热的 Observable 是没有意义的,因为它永远不会完成。但是,您可以重复此热可观察对象的 window。我从 @Enigmativity 重构了这个 answer 以重复所有值而不是最后一个值。

    public static IObservable<T> RepeatAllDuringSilence<T>(this IObservable<T> source, TimeSpan maxQuietPeriod,
        IScheduler scheduler=null, IObservable<T> inner=null) {
        if (scheduler==null)
            scheduler=Scheduler.Default;
        if (inner == null)
            inner = source;
        return Observable.Create<T>(observer => {
            var replay = inner.Replay(scheduler);
            var sequence = source.Select(x => Observable.Interval(maxQuietPeriod, scheduler).SelectMany(l => replay).StartWith(x)).Switch();
            return new CompositeDisposable(replay.Connect(), sequence.Subscribe(observer));
        });
    }

然后在我的测试中使用此方法,我设法在没有完成 ReplaySubject 或丢失未来通知的情况下获得预期结果,因为我更改了原始可观察对象的温度。

class MyClass : ReactiveTest {
    [Fact]
    public void MethodName4() {
        var strings = new Subject<string>();
        var testScheduler = new TestScheduler();
        testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
            strings.OnNext("a");
            strings.OnNext("b");
        });
        testScheduler.Schedule("", TimeSpan.FromTicks(20), (scheduler, s) => {
            strings.OnNext("c");
        });

        var numbers = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5));
        numbers.Zip(strings.RepeatAllDuringSilence(TimeSpan.FromTicks(10),testScheduler), (i, s) => (i, s)).Subscribe(tuple => WriteLine(tuple));
        testScheduler.AdvanceBy(50);
    }

}

(1, a)
(2, b)
(3, c)
(4, a)
(5, b)

这不能解决您的问题吗? :

  void TestMergedPairs()
    {
        var source1 = Observable.Range(1,9999);
        var source2 = Observable.Range(65, 2).Repeat();


        var z = source1.Zip(source2,(x,y)=> { return new { left = x, right = y }; })
             .TakeWhile((v)=>v.left<1000);


        z.Subscribe(x => { Console.WriteLine( x.left +" "+x.right ); });
    }

如果不是,那么不清楚你的问题是什么。如果您希望能够继续重复第二个可观察对象的历史内容,那么您的 "a","b" 稍后可能会变成 "a","b","c" 然后应该迭代 3 个字母序列,那么也许您的来源可能是聚合历史 windows.

的 .Scan 的结果