如何根据特定条件通过滑动 window 触发 RX 信号

How to fire RX signal with sliding window based on certain condition

我有一个热的可观察传感器数据流。我需要一个可观察到的信号,该信号仅在传感器值低于 15 一段时间后才会触发。如果在任何时候该值超过 15,它应该重置滑动 window。 我已经让它部分地与下面的代码一起工作 - 但是如果值一直低于 15 它不会触发。

var notification = _sensor.Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));

有什么建议吗?

如果 _sensor 从未发出等于或大于 15 的值,则永远不会调用 Throttle

简单的解决方法是向 _sensornotification

添加唤醒通知
var wakeup = Observable.Return(15);

var notification = _sensor.Merge(wakeup)
                          .Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));

如果我正确理解了您的需求,那么这可行:

var notification = _sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromMinutes(1.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

我假设 _sensor 是一个 IObservable<double>

因此,只要值保持在 15.0 以下至少一分钟,此可观察对象就会发布 _sensor 流中的所有值。我已经测试了这个功能。

我的测试代码是:

var random = new Random();
var _sensor = Observable.Generate(
    0,
    x => true,
    x => x,
    x => random.NextDouble() * 16.0,
    x => TimeSpan.FromSeconds(random.NextDouble()));

var published_sensor = _sensor.Publish();

var notification = published_sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromSeconds(5.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

published_sensor.Merge(notification).Timestamp().Dump();

published_sensor.Connect();

我得到的结果是:

请注意,在发布重复值之前经过 5 秒,当源生成的值超过 15 时,重复停止。