组合两个可观察序列

Combining Two Observable Sequences

我最近重新开始使用 RX 编程并想出了一个结合两个序列的问题。

我有序列 o1 看起来像这样:

var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
                   .Select(i => i + 2)
                   .Take(2)
                   .StartWith(1);

我的序列 o2 看起来像这样:

var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
                   .Delay(TimeSpan.FromSeconds(1))
                   .Select(i => i + 2)
                   .Take(4)
                   .StartWith(1);

大致对应这个弹珠图:

o1: 1 - - - - - - 2 - - - - - - 3 -
o2: - a - - b - - c - - d - - e - -
o3: - 1a- - - - - 2c- - - - - - 3e-

我只是在寻找序列 o3 但我似乎无法计算出来。单靠 ZipCombineLatest 都无法胜任这项工作。

您可以结合使用 CombineLatest()Buffer()Where()Select() 来构建可观察对象 o3:

  • CombineLatest() - 只需将两个可观察值 o1o2 组合成您想要的任何数据结构(我使用了 Tuple)。
  • Buffer(2,1) - 建一个“滑动window”可以看到之前和当前的Tuple.
  • Where() - 过滤“滑动 window”,你只会得到一个滑动 window,其中前一个 [= 的第一个元素(来自 o1) 20=] 与当前 Tuple 的第一个元素(同样来自 o1)不同,因此无论 o2 在此期间做了什么,您都知道发生了变化。
  • Select() - 只需 select 当前(或之前)Tuple.

Observable 可能如下所示:

var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
       .Select(i => i + 2)
       .Take(2)
       .StartWith(1)
       .Do(it => {
           Console.WriteLine("-- o1 triggered: "+it);
       });
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
       .Delay(TimeSpan.FromSeconds(1))
       .Select(i => i + 2)
       .Take(4)
       .StartWith(1)
       .Do(it => {
           Console.WriteLine("-- o2 triggered: "+it);
       });
o1.CombineLatest(o2, Tuple.Create)
    .StartWith(Tuple.Create(0L, 0L))
    .Buffer(2, 1)
    .Do(it => {
        Console.WriteLine("-- After Buffer: "+String.Join(",",it));
    })
    .Where(it => {
        if (it.Count != 2) {
            return false;
        }
        return it[0].Item1 != it[1].Item1;
    })
    .Select(it => it[1])
    .Subscribe(it => {
        Console.WriteLine("Final: "+it);
    });

这将生成以下输出:

-- o1 triggered: 1
-- o2 triggered: 1
-- After Buffer: (0, 0),(1, 1)
Final: (1, 1)
-- o2 triggered: 2
-- After Buffer: (1, 1),(1, 2)
-- o1 triggered: 2
-- After Buffer: (1, 2),(2, 2)
Final: (2, 2)
-- o2 triggered: 3
-- After Buffer: (2, 2),(2, 3)
-- o2 triggered: 4
-- After Buffer: (2, 3),(2, 4)
-- o2 triggered: 5
-- After Buffer: (2, 4),(2, 5)
-- o1 triggered: 3
-- After Buffer: (2, 5),(3, 5)
Final: (3, 5)
-- After Buffer: (3, 5)

您可能需要 adjust/add/remove StartWith() 调用,具体取决于您的实际需求 and/or 更改 Select() 调用以获取之前或当前的 Tuple .

您可能正在搜索 WithLatestFrom 运算符。

Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any. Starting from Rx.NET 4.0, this will subscribe to second before subscribing to first, to have a latest element readily available in case first emits an element right away.

在其最简单的形式中,它具有以下签名:

public static IObservable<(TFirst First, TSecond Second)> WithLatestFrom<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second);

生成的序列发出 ValueTuple<TFirst,TSecond>.

类型的元素

对于两个示例源可观察对象,这按预期工作:

IObservable<long>  o1 =
    Observable
        .Interval(TimeSpan.FromSeconds(7))
        .Select(i => i + 2)
        .Take(2)
        .StartWith(1);

IObservable<char> o2 =
    Observable
        .Interval(TimeSpan.FromSeconds(3))
        .Delay(TimeSpan.FromSeconds(1))
        .Select(i => i + 2)
        .Take(4)
        .StartWith(1)
        .Select(x => (char)('a' + x - 1));

IObservable<string> o3 =
    from x1 in o1
    join x2 in o2
        on Observable.Timer(TimeSpan.FromSeconds(2.0))
        equals Observable.Timer(TimeSpan.FromSeconds(2.0))
    select $"{x1}{x2}";

o3.Subscribe(Console.WriteLine);

我得到的输出是:

1a
2c
3e