Rx DelayOnTrue 扩展实现

Rx DelayOnTrue extension implementation

我正在尝试在 C# 中实现 bool 值的扩展。该实现应该直接传递假值,但是当接收到真值时,它应该延迟输出带有前缀延迟的真值。

弹珠图应该是:

      |Delay| |Delay|
  F---T---F---T----T---T---F---C
  -F-----------------T------F---C

备注:

我目前的实现是

public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay)
{
    return source.DistinctUntilChanged().Select(value =>  value
            ? Observable.Never<bool>().StartWith(true).Delay(delay)
            : Observable.Never<bool>().StartWith(false))
        .Switch();
}

但是好像不行,因为真值后没有正确取消假值后。我是 Rx 的新手,所以可能有更好的方法来实现这个扩展。

这将用于检查多个服务器应用程序的 IObservable CanProccessMoreJobs 属性,在快速输出更改后仅向服务器添加更多作业,至少延迟值为真值。

这是一个有趣的问题,我想知道它是否属于 XY 类问题。无论如何,鉴于您的弹珠图,您几乎肯定不想要 DistinctUntilChanged() 否则您将永远不会重复 "F"s.

我对此的方法(毫无疑问还有其他方法)是安排所需的输出,确保删除任何获得 "throttled out" 的值(即在延迟期内 T 后跟 F)。这可以这样实现:

public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay, IScheduler scheduler)
{
    return Observable.Create<bool>(
        observer =>
        {
            var serialDisposable = new SerialDisposable();

            var delays = source
                .Materialize()
                .Scan(
                    (Notification: (Notification<bool>)null, Delay: 0L),
                    (seed, source) => source.Kind switch
                        {
                            NotificationKind.OnCompleted => (Notification.CreateOnCompleted<bool>(), seed.Delay),
                            NotificationKind.OnError => (Notification.CreateOnError<bool>(source.Exception), 0),
                            _ => source.Value
                                ? (Notification.CreateOnNext(source.Value), delay.Ticks + 1)
                                : (Notification.CreateOnNext(source.Value), 1)
                        })
                .Where(tuple => tuple.Notification != null)
                .Publish();

            // Emit values after the delay, cancelling an items that are throttled
            var onNext = delays
                .Where(tuple => tuple.Notification.Kind == NotificationKind.OnNext)
                .Subscribe(tuple => serialDisposable.Disposable = scheduler.Schedule(scheduler.Now.AddTicks(tuple.Delay), () => observer.OnNext(tuple.Notification.Value)));

            // Emit completion after delay of last item to be emitted
            var onCompleted = delays
                .Where(tuple => tuple.Notification.Kind == NotificationKind.OnCompleted)
                .Subscribe(tuple => scheduler.Schedule(scheduler.Now.AddTicks(tuple.Delay), () => observer.OnCompleted()));

            // Emit errors immediately, cancelling any pending items
            var onError = delays
                .Where(tuple => tuple.Notification.Kind == NotificationKind.OnError)
                .Subscribe(tuple => serialDisposable.Disposable = scheduler.Schedule(TimeSpan.Zero, () => observer.OnError(tuple.Notification.Exception)));

            return new CompositeDisposable(new IDisposable[] { onNext, onCompleted, onError, delays.Connect(), serialDisposable });
        }
    );
}

由于需要在先前延迟的项目(我们使用 Scan 元组跟踪)之后处理完成,因此看起来有点复杂。

无论如何,请注意 scheduler 参数的添加。向 Rx 添加任何形式的异步时,应始终提供 IScheduler 参数,但可以默认为 Scheduler.Default,如下所示:

public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay)
{
    return source.ThrottleOnTrue(delay, Scheduler.Default);
}

现在,可以使用 TestScheduler 提供的 "virtual time" 来证明 ThrottleOnTrue 可以正常工作(弹珠图略有不同)。这是一个显示立即发出 "F" 值的测试(当前时间 + 1 个调度刻度):

private static long SchedulerOffset = ReactiveTest.Created + ReactiveTest.Subscribed;
private static long NotificationOffset = ReactiveTest.Subscribed;

/// <summary>
/// source:   F---F---F-C
/// expected: -F---F---F-C
/// </summary>
[Test]
public void ShouldDirectlyPassFalseValues()
{
    var scheduler = new TestScheduler();

    var source = new[]
    {
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
    };

    var expected = new[]
    {
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
    };

    var xs = scheduler
        .CreateColdObservable(source)
        .ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);

    var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + SchedulerOffset + 2);

    CollectionAssert.AreEqual(expected, observed.Messages);
}

这是一个测试,显示 "T" 值在预期延迟后发出(并且取消了之前的 "T" 值):

/// <summary>
/// source:   T---T---T-C
/// expected: --------{delay}-T-C
/// </summary>
[Test]
public void ShouldDelayAndThrottleTrueValues()
{
    var scheduler = new TestScheduler();

    var source = new[]
    {
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(true)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(true)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(true)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
    };

    var expected = new[]
    {
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(true)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
    };

    var xs = scheduler
        .CreateColdObservable(source)
        .ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);

    var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + SchedulerOffset);

    CollectionAssert.AreEqual(expected, observed.Messages);
}

最后,这是一个显示 "T" 值被后续 "F" 值抵消的测试:

/// <summary>
/// source:   F---T---F-C
/// expected: -F-------F-C
/// </summary>
[Test]
public void ShouldIgnoreTrueWhenFollowedByFalseWithinDelay()
{
    var scheduler = new TestScheduler();

    var source = new[]
    {
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(true)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
    };

    var expected = new[]
    {
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
        new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
    };

    var xs = scheduler
        .CreateColdObservable(source)
        .ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);

    var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + SchedulerOffset);

    CollectionAssert.AreEqual(expected, observed.Messages);
}

很确定这就是您要找的东西,但由于您最初的弹珠图有点自相矛盾,所以我不能 100% 确定。不管怎样,我希望它能有所帮助。

你的弹珠图和问题描述的文字在我看来似乎不符。另外,您编写的代码似乎与其中任何一个都不匹配!

所以我认为您希望始终立即生成一个 false 值并发出。我还认为,当出现 true 时,您将延迟 delay 参数,如果同时没有其他值出现,则只输出 true 。如果有人这样做,您只需遵循相同的规则即可。

这主要基于您的代码。

你没有说为什么你认为你的代码不起作用,所以我测试了它。

这是我的代码,可将弹珠图转换为可观察对象,然后通过您的运算符运行它:

var marble = "F---T---F---T----T---T---F---C";

Observable
    .Generate(
        0,
        x => marble[x] != 'C',
        x => x + 1,
        x => marble[x] == '-' ? (bool?)null : (marble[x] == 'T' ? true : false),
        x => TimeSpan.FromSeconds(1.0))
    .Where(x => x != null)
    .Select(x => x.Value)
    .ThrottleOnTrue(TimeSpan.FromSeconds(5.0))
    .Timestamp()

您的代码产生了这个:

2020/06/15 01:16:23 +00:00 False 
2020/06/15 01:16:31 +00:00 False 
2020/06/15 01:16:40 +00:00 True 
2020/06/15 01:16:48 +00:00 False 

但观察从未结束。

我建议像这样重写您的代码:

public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay) =>
    source
        .Select(value =>
            Observable
                .Return(value)
                .Delay(value ? delay : TimeSpan.Zero))
        .Switch();

这样我得到了相同的结果,但它完成了。希望你能解决这个问题。