如何根据特定条件通过滑动 window 触发 RX 信号
How to fire RX signal with sliding window based on certain condition
我有一个热的可观察传感器数据流。我需要一个可观察到的信号,该信号仅在传感器值低于 15 一段时间后才会触发。如果在任何时候该值超过 15,它应该重置滑动 window。
我已经让它部分地与下面的代码一起工作 - 但是如果值一直低于 15 它不会触发。
var notification = _sensor.Where(v => v >= 15)
.Throttle(new TimeSpan(0, 1, 0))
.SelectMany(_ => Observable.Return(Unit.Default));
有什么建议吗?
如果 _sensor
从未发出等于或大于 15 的值,则永远不会调用 Throttle
。
简单的解决方法是向 _sensor
或 notification
添加唤醒通知
var wakeup = Observable.Return(15);
var notification = _sensor.Merge(wakeup)
.Where(v => v >= 15)
.Throttle(new TimeSpan(0, 1, 0))
.SelectMany(_ => Observable.Return(Unit.Default));
如果我正确理解了您的需求,那么这可行:
var notification = _sensor.Publish(ps => ps
.Select(x => x >= 15.0)
.DistinctUntilChanged()
.Select(p => p
? Observable.Empty<double>()
: Observable
.Timer(TimeSpan.FromMinutes(1.0))
.Select(x => -1.0)
.IgnoreElements()
.Concat(ps))
.Switch());
我假设 _sensor
是一个 IObservable<double>
。
因此,只要值保持在 15.0 以下至少一分钟,此可观察对象就会发布 _sensor
流中的所有值。我已经测试了这个功能。
我的测试代码是:
var random = new Random();
var _sensor = Observable.Generate(
0,
x => true,
x => x,
x => random.NextDouble() * 16.0,
x => TimeSpan.FromSeconds(random.NextDouble()));
var published_sensor = _sensor.Publish();
var notification = published_sensor.Publish(ps => ps
.Select(x => x >= 15.0)
.DistinctUntilChanged()
.Select(p => p
? Observable.Empty<double>()
: Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Select(x => -1.0)
.IgnoreElements()
.Concat(ps))
.Switch());
published_sensor.Merge(notification).Timestamp().Dump();
published_sensor.Connect();
我得到的结果是:
请注意,在发布重复值之前经过 5 秒,当源生成的值超过 15 时,重复停止。
我有一个热的可观察传感器数据流。我需要一个可观察到的信号,该信号仅在传感器值低于 15 一段时间后才会触发。如果在任何时候该值超过 15,它应该重置滑动 window。 我已经让它部分地与下面的代码一起工作 - 但是如果值一直低于 15 它不会触发。
var notification = _sensor.Where(v => v >= 15)
.Throttle(new TimeSpan(0, 1, 0))
.SelectMany(_ => Observable.Return(Unit.Default));
有什么建议吗?
如果 _sensor
从未发出等于或大于 15 的值,则永远不会调用 Throttle
。
简单的解决方法是向 _sensor
或 notification
var wakeup = Observable.Return(15);
var notification = _sensor.Merge(wakeup)
.Where(v => v >= 15)
.Throttle(new TimeSpan(0, 1, 0))
.SelectMany(_ => Observable.Return(Unit.Default));
如果我正确理解了您的需求,那么这可行:
var notification = _sensor.Publish(ps => ps
.Select(x => x >= 15.0)
.DistinctUntilChanged()
.Select(p => p
? Observable.Empty<double>()
: Observable
.Timer(TimeSpan.FromMinutes(1.0))
.Select(x => -1.0)
.IgnoreElements()
.Concat(ps))
.Switch());
我假设 _sensor
是一个 IObservable<double>
。
因此,只要值保持在 15.0 以下至少一分钟,此可观察对象就会发布 _sensor
流中的所有值。我已经测试了这个功能。
我的测试代码是:
var random = new Random();
var _sensor = Observable.Generate(
0,
x => true,
x => x,
x => random.NextDouble() * 16.0,
x => TimeSpan.FromSeconds(random.NextDouble()));
var published_sensor = _sensor.Publish();
var notification = published_sensor.Publish(ps => ps
.Select(x => x >= 15.0)
.DistinctUntilChanged()
.Select(p => p
? Observable.Empty<double>()
: Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Select(x => -1.0)
.IgnoreElements()
.Concat(ps))
.Switch());
published_sensor.Merge(notification).Timestamp().Dump();
published_sensor.Connect();
我得到的结果是:
请注意,在发布重复值之前经过 5 秒,当源生成的值超过 15 时,重复停止。