重组来自同一反应流的元素

Recombining elements from the same reactive stream

我想实现的可以描述如下:

这个概念是这样的:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();

var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));

new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);

await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);

原源火爆。如果没有 Distinct,我显然会得到重复的值,它看起来会产生我期望看到的结果。

考虑到第一个派生流不是周期性的,有没有更好的方法?

您可以在源 observable 中附加索引,然后在最终合并的 observable 中应用 DistinctUntilChanged

var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));

new[] { s1, s2 }
    .Merge()
    .DistinctUntilChanged(p => p.Index) // discard duplicates
    .Select(p => p.Item) // discard the index
    .Subscribe(Console.WriteLine, cts.Token);

我猜运算符 DistinctUntilChangedDistinct 更轻量级,因为它只缓存最新的元素。