Rx .NET 首先获取并在时间间隔或条件之后跳过

Rx .NET take first and skip after time interval or condition

我正在学习 .NET 中的 rx,我有以下要求:

所以给定示例数据:

using System;
using System.Reactive.Linq;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var results = new[]
                {
                 "right", //1
                 "left", //2
                 "right", //3
                 "right", //4
                 "left", //5
                 "right", //6
                 "right", //7
                 "right", //8
                 "left" //9
                };

            var observable =  Observable.Generate(0, 
                x => x < results.Length, 
                x => x + 1,
                x => results[x], 
                x => 
                TimeSpan.FromSeconds(1)).Timestamp();

            observable.Subscribe(...);

            Console.ReadKey();
        }        
    }
}

结果应该是:

right //1
left //2
right //3
left //5
right //6
right //8
left //9

字符串 4 已被跳过,因为它距离最后一个只有 1 秒 "right",字符串 7 也是如此。但是,字符串 8 没有被跳过,因为距离字符串 6 有 2 秒。

可能的解决方案:

我尝试使用窗口函数来跳过条目,但这会跳过所有字符串,即使它们不是相同的值:

observable.Publish(ps =>
           ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2)))
             .SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value));

我还尝试为每个值添加时间戳并在 DistinctUntilChanged() EqualityComparer 中检查它们,但这似乎也没有按预期工作。

Hm.. 听起来像 observable.DistinctUntilChanged 来检测不同的事件,但通过 CombineLatestobservable.Debounce 合并也可以在一段时间后重复....

这将涵盖基础知识,但我不确定如果在 time-longer-than-debounce 之后出现与先前不同的项目会发生什么情况。源 DistinctUntilChanged 和 Debounce 运算符都会传递项目 "at the same time" 而且我不确定此时 CombineLatest 会做什么。有一个变化,你会得到两次这样的事件(在很短的时间内发生相同的事件),所以你需要再次删除重复数据。

如果您愿意添加某种时间戳,那么也有非常明确的方法,尽管我不确定它是否真的更容易..

  • 获取源事件 - {command} 流
  • GroupBy 项目的类型 - {command} 的子流,每个子流只包含一种类型的命令
  • 对每个子流应用TimeInterval,你得到{command, time-since-last-seen}的子流,每个子流只包含一种类型的命令
  • 将它们全部结合起来,你会得到{command, time-since-THIS-TYPE-was-last-seen}
  • 根据'time'将其转换为{command, null-or-ObjectThatIsAlwaysDifferent},如果时间小于hold-off时间则为NULL,如果时间大于hold-off则使用IsAlwaysDifferent
  • 的一些魔法值
  • DistinctUntilChanged

您应该能够通过简单地使 class 整个 gethashcode returns 0 等于 returns false 来实现神奇的 ObjectThatIsAlwaysDifferent。

这应该 return 具有与前一个命令不同的命令,或者与之前相同但发生时间长于延迟时间的事件。

现在想想,通过压缩当前值和之前的值应该可以很简单地做到这一点:

  • 获取{command}的源码流
  • 添加时间戳 - {command, timestamp}
  • 延迟 1,记住源和延迟
  • 将它们压缩在一起,现在你得到了 [{command, timestamp}, {prevcommand, prevtimestamp}]
  • 使用您的代码对其进行过滤:
    • 当命令 != prevcommand 时通过
    • 当命令 == prevcommand && timestamp-prevtimestamp > holdoff 时通过

a应该就是这样。像往常一样,不止一种方法可以做同样的事情。

我还没有测试过这段代码,但你大概明白了。

        source
            .Select(x => (str: x, timestamp: DateTime.Now))
            .Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)),
                (state, val) =>
                    (last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp))
                )
            .Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2))
            .Select(x=>x.next.str);

这比我想象的要棘手,因为三重情况(对,对,对,相隔一秒)。使用直线 .Zip 在这里不起作用。

这类似于 Sentinel 的回答,并正确处理了三重情况:

source
    .Timestamp()        
    .Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value
        ? item
        : state
    )
    .DistinctUntilChanged()
    .Select(t => t.Value);

解释:

  • .Timestamp() 用到达的时间戳包装每条消息
  • .Scan(1 arg)如果出现2秒内重复,则重复上一条消息,否则发出新消息
  • .DistinctUntilChanged() 去除重复的消息,这是因为 .Scan 发出两次旧消息
  • .Select 删除时间戳。