反应式条件节流运算符

Reactive conditional throttling operator

我正在寻找类似 Throttle 的运算符,但限制行为仅在布尔值为真时应用。因此,给定两个 Observable IObservable<T> valuesIObservable<bool> throttleCondition,我想创建一个执行以下操作的 Observable:

或者,用弹珠图表示(注意4):

           values |   0 1   2 3 4   5 6
throttleCondition | F     T       F
           result |   0 1         4 5 6 

我得到的最接近的是:

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    bool cond = false;
    throttleCondition.Subscribe(v => cond = v);
    return self.Window(throttleCondition).Select(obs => cond ? obs.TakeLast(1) : obs).SelectMany(obs => obs);
}

但这不是线程安全的,因为 Select 和订阅之间可能存在竞争。 有没有人有什么建议?也许已经有一个我没有看到的操作员?

编辑: 这是我需要的功能的单元测试:

[TestMethod]
public void TestThrottleWhen()
{
    //Setup
    var scheduler = new TestScheduler();
    Subject<int> numberValues = new Subject<int>();
    Subject<bool> flagValues = new Subject<bool>();

    //Define actions
    scheduler.Schedule(TimeSpan.FromTicks(1), () => flagValues.OnNext(false));
    scheduler.Schedule(TimeSpan.FromTicks(10), () => numberValues.OnNext(0));
    scheduler.Schedule(TimeSpan.FromTicks(20), () => numberValues.OnNext(1));
    scheduler.Schedule(TimeSpan.FromTicks(30), () => flagValues.OnNext(true));
    scheduler.Schedule(TimeSpan.FromTicks(40), () => numberValues.OnNext(2));
    scheduler.Schedule(TimeSpan.FromTicks(50), () => numberValues.OnNext(3));
    scheduler.Schedule(TimeSpan.FromTicks(60), () => numberValues.OnNext(4));
    scheduler.Schedule(TimeSpan.FromTicks(70), () => flagValues.OnNext(false));
    scheduler.Schedule(TimeSpan.FromTicks(71), () => flagValues.OnNext(true));
    scheduler.Schedule(TimeSpan.FromTicks(72), () => flagValues.OnNext(false));
    scheduler.Schedule(TimeSpan.FromTicks(80), () => numberValues.OnNext(5));
    scheduler.Schedule(TimeSpan.FromTicks(90), () => numberValues.OnNext(6));
    var actual = scheduler.Start(() => numberValues.ThrottleWhen(flagValues), 0, 0, 100);

    //Assert
    var expected = new[]
    {
        ReactiveTest.OnNext(10, 0),
        ReactiveTest.OnNext(20, 1),
        ReactiveTest.OnNext(70, 4),
        ReactiveTest.OnNext(80, 5),
        ReactiveTest.OnNext(90, 6)

    };
    ReactiveAssert.AreElementsEqual(expected, actual.Messages);
}

编辑 2: 我最终使用了 Alex 答案的修改版本:

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    var isPaused = throttleCondition.Prepend(false).DistinctUntilChanged();
    return Observable.Defer(() =>
    {
        object lockObj = new object();
        bool gateIsOpen = false;
        return Observable.CombineLatest(
                self.Synchronize(lockObj).Do(_ => gateIsOpen = true),
                isPaused.Synchronize(lockObj).Do(paused => gateIsOpen = !paused && gateIsOpen),
                (number, paused) => (number, paused)
            )
            .Where(tuple => !tuple.paused && gateIsOpen)
            .Select(tuple => tuple.number);
    });
}

希望这是最后一次尝试...^^

回到 CombineLatest 方法并结合您的一些解决方案,我想出了一个额外的标志,这将帮助我们避免标志变回的极端情况在没有从可观察值发出的任何值的情况下来回。

bool numberChangedFlag = false;

_numberValues
    .Do(_ => numberChangedFlag = true)
    .CombineLatest(_flagValues.Do(x => numberChangedFlag = !x && numberChangedFlag), (number, flag) => (number, flag))
    .Where(tuple => !tuple.flag && numberChangedFlag)
    .Select(tuple => tuple.number)
    .Subscribe(DoYourMagic);

我已经用你的测试试过了,看起来它有效。

虽然我不喜欢我们需要一个局部辅助变量来解决这个问题,但更深入地创建一个辅助可观察对象似乎会使代码变得过于复杂。

让我知道这次是否有效。 :)

我觉得这样比较干净,但是干净是仁者见仁智者见智:

核心的解决方案如下所示:

public static IObservable<T> ThrottleWhenCore<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{

    return throttleCondition
        .StartWith(false)
        .Join(self,
            b => b ? Observable.Empty<bool>() : throttleCondition,
            t => self,
            (b, t) => (b, t)
        )
        .Where(tuple => !tuple.b)
        .Select(tuple => tuple.t);
}

Join不太好理解:对于从左边引入的每个item observable(这里是throttle Condition)打开一个左边的window。从右边的 observable (self) 引入的每个项目都会打开一个右边的 window。每当左 window 与右 window 在时间上相交时,就会产生一个新值。

当然可以选择什么时候关闭windows:在我们的例子中,self中的任何项目关闭最新的权利window,所以在第一个项目之后,总是有一个并且只有一个权利 window 随时开放。对于 throttleCondition 左侧,当值为 false 时,window 保持打开状态(当 F 为 up 时允许所有值)通过。当 throttleCondition 为真时,我们打开并立即关闭 window,它只允许最近的值通过。

除了打开和关闭油门的情况外,这通过了所有测试用例。在那种情况下,它会在 72 个刻度标记处加倍发射 4。要解决这个问题,您可以添加一个索引器和一个 DistinctUntilChanged:

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{

    return throttleCondition
        .StartWith(false)
        .Join(self.Select((item, index) => (item, index)),
            b => b ? Observable.Empty<bool>() : throttleCondition,
            t => self,
            (b, t) => (b, t)
        )
        .Select(tuple => tuple.t)
        .DistinctUntilChanged()
        .Select(tuple => tuple.item);
}

如果您需要更多解释,请告诉我。我更喜欢这个而不是 locking/state 变量。

这是另一种可能性,基于@Enigmativity 的评论。考虑这一点的最佳方法是将其分为两个问题:当条件为真时获取最新信息,并在条件为假时自由发射,然后将两者合并在一起。

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{

    return throttleCondition.Publish(_throttleCondition => self.Publish(_self => Observable.Merge(
        _throttleCondition   //Get latest when true handler
            .Where(b => !b)
            .WithLatestFrom(
                _self.Select((item, index) => (item, index)),
                (_, t) => t
            )
            .DistinctUntilChanged()
            .Select(t => t.item),

        _throttleCondition   //Freely emit when false, default start with false.
            .StartWith(false)
            .Select(b => b ? Observable.Empty<T>(): _self)
            .Switch()

    )));
}

这使用与我的其他答案相同的索引器技巧来防止重复。