我如何解决此 "reset after time" 可观察对象中的竞争条件?

How can I fix race condition in this "reset after time" observable?

输入是一个可观察值,每次出现问题时都会产生一个值。

作为输出,我想要一个可观察的对象,如果问题存在的时间较长,它会产生一个值。换句话说,如果最后一个问题已过时,我想 "reset" 输出可观察值(不产生值)。

我的解决方案:

// first get an observable producing statusOk values (true = ok, false = not ok)
var okStatusObservable = input.Select(_ => true).Throttle(longerTime)
                  .Merge(input.Select(_ => false));


// we only want event if statusOk=false for a longer time
var outputObservable = okStatusObservable
                  .DistinctUntilChanged() // only changes
                  .Throttle(evenLongerTime) // wait for stable status
                  .Where(_ => _ == false); // only interested in bad status

我认为 okStatusObservable 可能包含竞争条件:如果输入在恰好 longerTime 和第二个合并部分 (Select / false) 的时间间隔接收事件将在第一部分 (Select+Throttle / true) 之前产生一个布尔值,然后这将导致 okStatus 在 99.9% 的时间内为 true,而相反的情况是正确。

(PS:为了从一开始就有状态值,我们可以添加 .StartWith(true) 但这与竞争条件无关紧要。)

执行第一个 observable 的更简洁的方法如下:

var okStatusObservable2 = input
    .Select(_ => Observable.Return(true).Delay(longerTime).StartWith(false))
    .Switch();

解释:对于每个 input 消息,产生一个以 false 开头的 observable,在 longerTime 之后产生一个 true。 Switch 意味着如果你有一个新的 observable,只需切换到它,这将排除最后的 all-clear true

对于您的第二个 observable,除非两个 observable 之间 longerTime 不同,否则第一个 observable 中的每个第一个 false 将导致第二个 observable 中的 false。这是你的意图吗?

此外,您的 Where 搞砸了(应该是 .Where(b => !b).Where(b => b == false).Where(_ => false) 将始终计算为 false,什么都不返回。

除此之外,我认为你的解决方案是合理的。