Rx .NET 首先获取并在时间间隔或条件之后跳过
Rx .NET take first and skip after time interval or condition
我正在学习 .NET 中的 rx,我有以下要求:
- 字符串序列来自 API。他们以不同的时间间隔出现,我不知道。有时一秒来5串,有时5秒只来1串
- 字符串基本上是五个命令:"start"、"stop"、"left"、"right"、"back"。还有其他命令传入,但可以过滤掉
- 程序现在应该在命令进入时执行。
- 如果相同的命令在给定的时间段内(比如2秒)传入,则该命令应该只执行一次.如果在此期间有另一个命令传入,应立即执行。如果在给定的时间段后收到与上一个执行的命令相同的命令,则应该执行它。
- 没有为传入数据分配时间戳(但如果需要可以这样做)。
所以给定示例数据:
using System;
using System.Reactive.Linq;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var results = new[]
{
"right", //1
"left", //2
"right", //3
"right", //4
"left", //5
"right", //6
"right", //7
"right", //8
"left" //9
};
var observable = Observable.Generate(0,
x => x < results.Length,
x => x + 1,
x => results[x],
x =>
TimeSpan.FromSeconds(1)).Timestamp();
observable.Subscribe(...);
Console.ReadKey();
}
}
}
结果应该是:
right //1
left //2
right //3
left //5
right //6
right //8
left //9
字符串 4 已被跳过,因为它距离最后一个只有 1 秒 "right",字符串 7 也是如此。但是,字符串 8 没有被跳过,因为距离字符串 6 有 2 秒。
可能的解决方案:
我尝试使用窗口函数来跳过条目,但这会跳过所有字符串,即使它们不是相同的值:
observable.Publish(ps =>
ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2)))
.SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value));
我还尝试为每个值添加时间戳并在 DistinctUntilChanged() EqualityComparer 中检查它们,但这似乎也没有按预期工作。
Hm.. 听起来像 observable.DistinctUntilChanged
来检测不同的事件,但通过 CombineLatest
与 observable.Debounce
合并也可以在一段时间后重复....
这将涵盖基础知识,但我不确定如果在 time-longer-than-debounce 之后出现与先前不同的项目会发生什么情况。源 DistinctUntilChanged 和 Debounce 运算符都会传递项目 "at the same time" 而且我不确定此时 CombineLatest 会做什么。有一个变化,你会得到两次这样的事件(在很短的时间内发生相同的事件),所以你需要再次删除重复数据。
如果您愿意添加某种时间戳,那么也有非常明确的方法,尽管我不确定它是否真的更容易..
- 获取源事件 - {command} 流
- GroupBy 项目的类型 - {command} 的子流,每个子流只包含一种类型的命令
- 对每个子流应用TimeInterval,你得到{command, time-since-last-seen}的子流,每个子流只包含一种类型的命令
- 将它们全部结合起来,你会得到{command, time-since-THIS-TYPE-was-last-seen}
- 根据'time'将其转换为{command, null-or-ObjectThatIsAlwaysDifferent},如果时间小于hold-off时间则为NULL,如果时间大于hold-off则使用IsAlwaysDifferent
的一些魔法值
- DistinctUntilChanged
您应该能够通过简单地使 class 整个 gethashcode returns 0 等于 returns false 来实现神奇的 ObjectThatIsAlwaysDifferent。
这应该 return 具有与前一个命令不同的命令,或者与之前相同但发生时间长于延迟时间的事件。
现在想想,通过压缩当前值和之前的值应该可以很简单地做到这一点:
- 获取{command}的源码流
- 添加时间戳 - {command, timestamp}
- 延迟 1,记住源和延迟
- 将它们压缩在一起,现在你得到了 [{command, timestamp}, {prevcommand, prevtimestamp}]
- 使用您的代码对其进行过滤:
- 当命令 != prevcommand 时通过
- 当命令 == prevcommand && timestamp-prevtimestamp > holdoff 时通过
a应该就是这样。像往常一样,不止一种方法可以做同样的事情。
我还没有测试过这段代码,但你大概明白了。
source
.Select(x => (str: x, timestamp: DateTime.Now))
.Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)),
(state, val) =>
(last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp))
)
.Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2))
.Select(x=>x.next.str);
这比我想象的要棘手,因为三重情况(对,对,对,相隔一秒)。使用直线 .Zip
在这里不起作用。
这类似于 Sentinel 的回答,并正确处理了三重情况:
source
.Timestamp()
.Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
解释:
.Timestamp()
用到达的时间戳包装每条消息
.Scan(1 arg)
如果出现2秒内重复,则重复上一条消息,否则发出新消息
.DistinctUntilChanged()
去除重复的消息,这是因为 .Scan
发出两次旧消息
.Select
删除时间戳。
我正在学习 .NET 中的 rx,我有以下要求:
- 字符串序列来自 API。他们以不同的时间间隔出现,我不知道。有时一秒来5串,有时5秒只来1串
- 字符串基本上是五个命令:"start"、"stop"、"left"、"right"、"back"。还有其他命令传入,但可以过滤掉
- 程序现在应该在命令进入时执行。
- 如果相同的命令在给定的时间段内(比如2秒)传入,则该命令应该只执行一次.如果在此期间有另一个命令传入,应立即执行。如果在给定的时间段后收到与上一个执行的命令相同的命令,则应该执行它。
- 没有为传入数据分配时间戳(但如果需要可以这样做)。
所以给定示例数据:
using System;
using System.Reactive.Linq;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var results = new[]
{
"right", //1
"left", //2
"right", //3
"right", //4
"left", //5
"right", //6
"right", //7
"right", //8
"left" //9
};
var observable = Observable.Generate(0,
x => x < results.Length,
x => x + 1,
x => results[x],
x =>
TimeSpan.FromSeconds(1)).Timestamp();
observable.Subscribe(...);
Console.ReadKey();
}
}
}
结果应该是:
right //1
left //2
right //3
left //5
right //6
right //8
left //9
字符串 4 已被跳过,因为它距离最后一个只有 1 秒 "right",字符串 7 也是如此。但是,字符串 8 没有被跳过,因为距离字符串 6 有 2 秒。
可能的解决方案:
我尝试使用窗口函数来跳过条目,但这会跳过所有字符串,即使它们不是相同的值:
observable.Publish(ps =>
ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2)))
.SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value));
我还尝试为每个值添加时间戳并在 DistinctUntilChanged() EqualityComparer 中检查它们,但这似乎也没有按预期工作。
Hm.. 听起来像 observable.DistinctUntilChanged
来检测不同的事件,但通过 CombineLatest
与 observable.Debounce
合并也可以在一段时间后重复....
这将涵盖基础知识,但我不确定如果在 time-longer-than-debounce 之后出现与先前不同的项目会发生什么情况。源 DistinctUntilChanged 和 Debounce 运算符都会传递项目 "at the same time" 而且我不确定此时 CombineLatest 会做什么。有一个变化,你会得到两次这样的事件(在很短的时间内发生相同的事件),所以你需要再次删除重复数据。
如果您愿意添加某种时间戳,那么也有非常明确的方法,尽管我不确定它是否真的更容易..
- 获取源事件 - {command} 流
- GroupBy 项目的类型 - {command} 的子流,每个子流只包含一种类型的命令
- 对每个子流应用TimeInterval,你得到{command, time-since-last-seen}的子流,每个子流只包含一种类型的命令
- 将它们全部结合起来,你会得到{command, time-since-THIS-TYPE-was-last-seen}
- 根据'time'将其转换为{command, null-or-ObjectThatIsAlwaysDifferent},如果时间小于hold-off时间则为NULL,如果时间大于hold-off则使用IsAlwaysDifferent 的一些魔法值
- DistinctUntilChanged
您应该能够通过简单地使 class 整个 gethashcode returns 0 等于 returns false 来实现神奇的 ObjectThatIsAlwaysDifferent。
这应该 return 具有与前一个命令不同的命令,或者与之前相同但发生时间长于延迟时间的事件。
现在想想,通过压缩当前值和之前的值应该可以很简单地做到这一点:
- 获取{command}的源码流
- 添加时间戳 - {command, timestamp}
- 延迟 1,记住源和延迟
- 将它们压缩在一起,现在你得到了 [{command, timestamp}, {prevcommand, prevtimestamp}]
- 使用您的代码对其进行过滤:
- 当命令 != prevcommand 时通过
- 当命令 == prevcommand && timestamp-prevtimestamp > holdoff 时通过
a应该就是这样。像往常一样,不止一种方法可以做同样的事情。
我还没有测试过这段代码,但你大概明白了。
source
.Select(x => (str: x, timestamp: DateTime.Now))
.Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)),
(state, val) =>
(last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp))
)
.Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2))
.Select(x=>x.next.str);
这比我想象的要棘手,因为三重情况(对,对,对,相隔一秒)。使用直线 .Zip
在这里不起作用。
这类似于 Sentinel 的回答,并正确处理了三重情况:
source
.Timestamp()
.Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
解释:
.Timestamp()
用到达的时间戳包装每条消息.Scan(1 arg)
如果出现2秒内重复,则重复上一条消息,否则发出新消息.DistinctUntilChanged()
去除重复的消息,这是因为.Scan
发出两次旧消息.Select
删除时间戳。