仅发出唯一值 - 或者如果自上次发出以来已经过去了一段时间
Emit only unique values - or if a certain period of time has passed since last emit
鉴于我有一个 IObservable<int> values
接收这些值(在指定时间):
Time (secs) Value
1 4
2 3
3 5
4 5
10 5
11 6
更新:显示时间列只是为了举例说明接收值的时间。 Observable 仅包含简单的 int 值。
我想输出一个值如果:
- 与之前的值不同,或者
- 距离上一个值已经超过 5 秒
预期输出:4、3、5、5、6
所以像这样:
void Foo(IObservable<int> values)
{
...
values
.Something()
.Do(x => Console.WriteLine(x))
...
}
我觉得我需要 GroupByUntilChanged()
和 Throttle(TimeSpan.FromSeconds(5))
的某种组合,但我的 Reactive-fu 有点生疏...
这两个条件本身很简单,合并起来有点棘手:
与之前的值不同:
source.DistintUntilChanged()
如果安静 5 秒则输出:
source
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
对于合并,我会应用一个索引器,合并两个单独的解决方案,然后在索引器上进行过滤以删除重复项,如下所示:
var result = source
.Select((val, index) => (val, index)) // Add indexer
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index) // Filter out items with same index
.Select(t => t.val); // Remove indexing
如果你想测试它,你需要将调度器传递给TimeInterval
函数,它看起来像这样:
var ts = new TestScheduler();
// The sequence described in question.
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
// solution
var testResult = source
.Select((val, index) => (val, index))
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval(ts)
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index)
.Select(t => t.val);
// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
经过仔细思考,我的一位同事提醒我 .Scan() 和 DistinctUntilChanged() 组合的强大功能。因此,为了后代,我也将在此处添加该解决方案:
var result = source
.TimeInterval(scheduler)
.Scan((prev, curr) =>
{
if (prev.Value != curr.Value || curr.Interval > TimeSpan.FromSeconds(5))
return curr;
else
return prev;
})
.Select(x => x.Value)
.DistinctUntilChanged();
Reactive Extensions 真正遵循了 Perl 的古老座右铭 TIMTOWTDI。 ;-)
鉴于我有一个 IObservable<int> values
接收这些值(在指定时间):
Time (secs) Value
1 4
2 3
3 5
4 5
10 5
11 6
更新:显示时间列只是为了举例说明接收值的时间。 Observable 仅包含简单的 int 值。
我想输出一个值如果:
- 与之前的值不同,或者
- 距离上一个值已经超过 5 秒
预期输出:4、3、5、5、6
所以像这样:
void Foo(IObservable<int> values)
{
...
values
.Something()
.Do(x => Console.WriteLine(x))
...
}
我觉得我需要 GroupByUntilChanged()
和 Throttle(TimeSpan.FromSeconds(5))
的某种组合,但我的 Reactive-fu 有点生疏...
这两个条件本身很简单,合并起来有点棘手:
与之前的值不同:
source.DistintUntilChanged()
如果安静 5 秒则输出:
source
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
对于合并,我会应用一个索引器,合并两个单独的解决方案,然后在索引器上进行过滤以删除重复项,如下所示:
var result = source
.Select((val, index) => (val, index)) // Add indexer
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index) // Filter out items with same index
.Select(t => t.val); // Remove indexing
如果你想测试它,你需要将调度器传递给TimeInterval
函数,它看起来像这样:
var ts = new TestScheduler();
// The sequence described in question.
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
// solution
var testResult = source
.Select((val, index) => (val, index))
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval(ts)
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index)
.Select(t => t.val);
// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
经过仔细思考,我的一位同事提醒我 .Scan() 和 DistinctUntilChanged() 组合的强大功能。因此,为了后代,我也将在此处添加该解决方案:
var result = source
.TimeInterval(scheduler)
.Scan((prev, curr) =>
{
if (prev.Value != curr.Value || curr.Interval > TimeSpan.FromSeconds(5))
return curr;
else
return prev;
})
.Select(x => x.Value)
.DistinctUntilChanged();
Reactive Extensions 真正遵循了 Perl 的古老座右铭 TIMTOWTDI。 ;-)