仅发出唯一值 - 或者如果自上次发出以来已经过去了一段时间

Emit only unique values - or if a certain period of time has passed since last emit

鉴于我有一个 IObservable<int> values 接收这些值(在指定时间):

Time (secs)  Value
1            4
2            3
3            5
4            5
10           5
11           6

更新:显示时间列只是为了举例说明接收值的时间。 Observable 仅包含简单的 int 值。

我想输出一个值如果:

  1. 与之前的值不同,或者
  2. 距离上一个值已经超过 5 秒

预期输出:4、3、5、5、6

所以像这样:

void Foo(IObservable<int> values)
{
    ...
    values
        .Something()
        .Do(x => Console.WriteLine(x))
        ...
}

我觉得我需要 GroupByUntilChanged()Throttle(TimeSpan.FromSeconds(5)) 的某种组合,但我的 Reactive-fu 有点生疏...

这两个条件本身很简单,合并起来有点棘手:

与之前的值不同:

source.DistintUntilChanged()

如果安静 5 秒则输出:

source
    .TimeInterval()
    .Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
    .Select(timedT => timedT.Value)

对于合并,我会应用一个索引器,合并两个单独的解决方案,然后在索引器上进行过滤以删除重复项,如下所示:

var result = source
    .Select((val, index) => (val, index))           // Add indexer
    .Publish(_indexedSource => Observable.Merge(
        _indexedSource
            .DistinctUntilChanged(t => t.val),

        _indexedSource
            .TimeInterval()
            .Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
            .Select(timedT => timedT.Value)
    ))
    .DistinctUntilChanged(t => t.index)            // Filter out items with same index
    .Select(t => t.val);                           // Remove indexing

如果你想测试它,你需要将调度器传递给TimeInterval函数,它看起来像这样:

var ts = new TestScheduler();

// The sequence described in question.
var source = ts.CreateHotObservable(
    ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
    ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
    ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
    ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);

// solution
var testResult = source
    .Select((val, index) => (val, index))
    .Publish(_indexedSource => Observable.Merge(
        _indexedSource
            .DistinctUntilChanged(t => t.val),
        _indexedSource
            .TimeInterval(ts)
            .Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
            .Select(timedT => timedT.Value)
    ))
    .DistinctUntilChanged(t => t.index)
    .Select(t => t.val);

// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
    ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
    ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
    ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
    ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
    ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);

经过仔细思考,我的一位同事提醒我 .Scan() 和 DistinctUntilChanged() 组合的强大功能。因此,为了后代,我也将在此处添加该解决方案:

var result = source
    .TimeInterval(scheduler)
    .Scan((prev, curr) =>
    {
        if (prev.Value != curr.Value || curr.Interval > TimeSpan.FromSeconds(5))
            return curr;
        else
            return prev;
    })
    .Select(x => x.Value)
    .DistinctUntilChanged();

Reactive Extensions 真正遵循了 Perl 的古老座右铭 TIMTOWTDI。 ;-)