Rx DelayOnTrue 扩展实现
Rx DelayOnTrue extension implementation
我正在尝试在 C# 中实现 bool 值的扩展。该实现应该直接传递假值,但是当接收到真值时,它应该延迟输出带有前缀延迟的真值。
弹珠图应该是:
|Delay| |Delay|
F---T---F---T----T---T---F---C
-F-----------------T------F---C
备注:
- 它应该包含(或不)不同的值
- 仅当真值来自源至少延迟持续时间时才应输出真值。
- 在输出中有一堆真实值的情况下,使用第一个计算延迟。
- 当源发出错误时,这应该立即传递到输出。
我目前的实现是
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay)
{
return source.DistinctUntilChanged().Select(value => value
? Observable.Never<bool>().StartWith(true).Delay(delay)
: Observable.Never<bool>().StartWith(false))
.Switch();
}
但是好像不行,因为真值后没有正确取消假值后。我是 Rx 的新手,所以可能有更好的方法来实现这个扩展。
这将用于检查多个服务器应用程序的 IObservable CanProccessMoreJobs 属性,在快速输出更改后仅向服务器添加更多作业,至少延迟值为真值。
这是一个有趣的问题,我想知道它是否属于 XY 类问题。无论如何,鉴于您的弹珠图,您几乎肯定不想要 DistinctUntilChanged()
否则您将永远不会重复 "F"s.
我对此的方法(毫无疑问还有其他方法)是安排所需的输出,确保删除任何获得 "throttled out" 的值(即在延迟期内 T 后跟 F)。这可以这样实现:
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay, IScheduler scheduler)
{
return Observable.Create<bool>(
observer =>
{
var serialDisposable = new SerialDisposable();
var delays = source
.Materialize()
.Scan(
(Notification: (Notification<bool>)null, Delay: 0L),
(seed, source) => source.Kind switch
{
NotificationKind.OnCompleted => (Notification.CreateOnCompleted<bool>(), seed.Delay),
NotificationKind.OnError => (Notification.CreateOnError<bool>(source.Exception), 0),
_ => source.Value
? (Notification.CreateOnNext(source.Value), delay.Ticks + 1)
: (Notification.CreateOnNext(source.Value), 1)
})
.Where(tuple => tuple.Notification != null)
.Publish();
// Emit values after the delay, cancelling an items that are throttled
var onNext = delays
.Where(tuple => tuple.Notification.Kind == NotificationKind.OnNext)
.Subscribe(tuple => serialDisposable.Disposable = scheduler.Schedule(scheduler.Now.AddTicks(tuple.Delay), () => observer.OnNext(tuple.Notification.Value)));
// Emit completion after delay of last item to be emitted
var onCompleted = delays
.Where(tuple => tuple.Notification.Kind == NotificationKind.OnCompleted)
.Subscribe(tuple => scheduler.Schedule(scheduler.Now.AddTicks(tuple.Delay), () => observer.OnCompleted()));
// Emit errors immediately, cancelling any pending items
var onError = delays
.Where(tuple => tuple.Notification.Kind == NotificationKind.OnError)
.Subscribe(tuple => serialDisposable.Disposable = scheduler.Schedule(TimeSpan.Zero, () => observer.OnError(tuple.Notification.Exception)));
return new CompositeDisposable(new IDisposable[] { onNext, onCompleted, onError, delays.Connect(), serialDisposable });
}
);
}
由于需要在先前延迟的项目(我们使用 Scan
元组跟踪)之后处理完成,因此看起来有点复杂。
无论如何,请注意 scheduler
参数的添加。向 Rx 添加任何形式的异步时,应始终提供 IScheduler 参数,但可以默认为 Scheduler.Default
,如下所示:
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay)
{
return source.ThrottleOnTrue(delay, Scheduler.Default);
}
现在,可以使用 TestScheduler 提供的 "virtual time" 来证明 ThrottleOnTrue 可以正常工作(弹珠图略有不同)。这是一个显示立即发出 "F" 值的测试(当前时间 + 1 个调度刻度):
private static long SchedulerOffset = ReactiveTest.Created + ReactiveTest.Subscribed;
private static long NotificationOffset = ReactiveTest.Subscribed;
/// <summary>
/// source: F---F---F-C
/// expected: -F---F---F-C
/// </summary>
[Test]
public void ShouldDirectlyPassFalseValues()
{
var scheduler = new TestScheduler();
var source = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
};
var expected = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
};
var xs = scheduler
.CreateColdObservable(source)
.ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + SchedulerOffset + 2);
CollectionAssert.AreEqual(expected, observed.Messages);
}
这是一个测试,显示 "T" 值在预期延迟后发出(并且取消了之前的 "T" 值):
/// <summary>
/// source: T---T---T-C
/// expected: --------{delay}-T-C
/// </summary>
[Test]
public void ShouldDelayAndThrottleTrueValues()
{
var scheduler = new TestScheduler();
var source = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
};
var expected = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
};
var xs = scheduler
.CreateColdObservable(source)
.ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + SchedulerOffset);
CollectionAssert.AreEqual(expected, observed.Messages);
}
最后,这是一个显示 "T" 值被后续 "F" 值抵消的测试:
/// <summary>
/// source: F---T---F-C
/// expected: -F-------F-C
/// </summary>
[Test]
public void ShouldIgnoreTrueWhenFollowedByFalseWithinDelay()
{
var scheduler = new TestScheduler();
var source = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
};
var expected = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
};
var xs = scheduler
.CreateColdObservable(source)
.ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + SchedulerOffset);
CollectionAssert.AreEqual(expected, observed.Messages);
}
很确定这就是您要找的东西,但由于您最初的弹珠图有点自相矛盾,所以我不能 100% 确定。不管怎样,我希望它能有所帮助。
你的弹珠图和问题描述的文字在我看来似乎不符。另外,您编写的代码似乎与其中任何一个都不匹配!
所以我认为您希望始终立即生成一个 false
值并发出。我还认为,当出现 true
时,您将延迟 delay
参数,如果同时没有其他值出现,则只输出 true
。如果有人这样做,您只需遵循相同的规则即可。
这主要基于您的代码。
你没有说为什么你认为你的代码不起作用,所以我测试了它。
这是我的代码,可将弹珠图转换为可观察对象,然后通过您的运算符运行它:
var marble = "F---T---F---T----T---T---F---C";
Observable
.Generate(
0,
x => marble[x] != 'C',
x => x + 1,
x => marble[x] == '-' ? (bool?)null : (marble[x] == 'T' ? true : false),
x => TimeSpan.FromSeconds(1.0))
.Where(x => x != null)
.Select(x => x.Value)
.ThrottleOnTrue(TimeSpan.FromSeconds(5.0))
.Timestamp()
您的代码产生了这个:
2020/06/15 01:16:23 +00:00 False
2020/06/15 01:16:31 +00:00 False
2020/06/15 01:16:40 +00:00 True
2020/06/15 01:16:48 +00:00 False
但观察从未结束。
我建议像这样重写您的代码:
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay) =>
source
.Select(value =>
Observable
.Return(value)
.Delay(value ? delay : TimeSpan.Zero))
.Switch();
这样我得到了相同的结果,但它完成了。希望你能解决这个问题。
我正在尝试在 C# 中实现 bool 值的扩展。该实现应该直接传递假值,但是当接收到真值时,它应该延迟输出带有前缀延迟的真值。
弹珠图应该是:
|Delay| |Delay|
F---T---F---T----T---T---F---C
-F-----------------T------F---C
备注:
- 它应该包含(或不)不同的值
- 仅当真值来自源至少延迟持续时间时才应输出真值。
- 在输出中有一堆真实值的情况下,使用第一个计算延迟。
- 当源发出错误时,这应该立即传递到输出。
我目前的实现是
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay)
{
return source.DistinctUntilChanged().Select(value => value
? Observable.Never<bool>().StartWith(true).Delay(delay)
: Observable.Never<bool>().StartWith(false))
.Switch();
}
但是好像不行,因为真值后没有正确取消假值后。我是 Rx 的新手,所以可能有更好的方法来实现这个扩展。
这将用于检查多个服务器应用程序的 IObservable CanProccessMoreJobs 属性,在快速输出更改后仅向服务器添加更多作业,至少延迟值为真值。
这是一个有趣的问题,我想知道它是否属于 XY 类问题。无论如何,鉴于您的弹珠图,您几乎肯定不想要 DistinctUntilChanged()
否则您将永远不会重复 "F"s.
我对此的方法(毫无疑问还有其他方法)是安排所需的输出,确保删除任何获得 "throttled out" 的值(即在延迟期内 T 后跟 F)。这可以这样实现:
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay, IScheduler scheduler)
{
return Observable.Create<bool>(
observer =>
{
var serialDisposable = new SerialDisposable();
var delays = source
.Materialize()
.Scan(
(Notification: (Notification<bool>)null, Delay: 0L),
(seed, source) => source.Kind switch
{
NotificationKind.OnCompleted => (Notification.CreateOnCompleted<bool>(), seed.Delay),
NotificationKind.OnError => (Notification.CreateOnError<bool>(source.Exception), 0),
_ => source.Value
? (Notification.CreateOnNext(source.Value), delay.Ticks + 1)
: (Notification.CreateOnNext(source.Value), 1)
})
.Where(tuple => tuple.Notification != null)
.Publish();
// Emit values after the delay, cancelling an items that are throttled
var onNext = delays
.Where(tuple => tuple.Notification.Kind == NotificationKind.OnNext)
.Subscribe(tuple => serialDisposable.Disposable = scheduler.Schedule(scheduler.Now.AddTicks(tuple.Delay), () => observer.OnNext(tuple.Notification.Value)));
// Emit completion after delay of last item to be emitted
var onCompleted = delays
.Where(tuple => tuple.Notification.Kind == NotificationKind.OnCompleted)
.Subscribe(tuple => scheduler.Schedule(scheduler.Now.AddTicks(tuple.Delay), () => observer.OnCompleted()));
// Emit errors immediately, cancelling any pending items
var onError = delays
.Where(tuple => tuple.Notification.Kind == NotificationKind.OnError)
.Subscribe(tuple => serialDisposable.Disposable = scheduler.Schedule(TimeSpan.Zero, () => observer.OnError(tuple.Notification.Exception)));
return new CompositeDisposable(new IDisposable[] { onNext, onCompleted, onError, delays.Connect(), serialDisposable });
}
);
}
由于需要在先前延迟的项目(我们使用 Scan
元组跟踪)之后处理完成,因此看起来有点复杂。
无论如何,请注意 scheduler
参数的添加。向 Rx 添加任何形式的异步时,应始终提供 IScheduler 参数,但可以默认为 Scheduler.Default
,如下所示:
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay)
{
return source.ThrottleOnTrue(delay, Scheduler.Default);
}
现在,可以使用 TestScheduler 提供的 "virtual time" 来证明 ThrottleOnTrue 可以正常工作(弹珠图略有不同)。这是一个显示立即发出 "F" 值的测试(当前时间 + 1 个调度刻度):
private static long SchedulerOffset = ReactiveTest.Created + ReactiveTest.Subscribed;
private static long NotificationOffset = ReactiveTest.Subscribed;
/// <summary>
/// source: F---F---F-C
/// expected: -F---F---F-C
/// </summary>
[Test]
public void ShouldDirectlyPassFalseValues()
{
var scheduler = new TestScheduler();
var source = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
};
var expected = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
};
var xs = scheduler
.CreateColdObservable(source)
.ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + SchedulerOffset + 2);
CollectionAssert.AreEqual(expected, observed.Messages);
}
这是一个测试,显示 "T" 值在预期延迟后发出(并且取消了之前的 "T" 值):
/// <summary>
/// source: T---T---T-C
/// expected: --------{delay}-T-C
/// </summary>
[Test]
public void ShouldDelayAndThrottleTrueValues()
{
var scheduler = new TestScheduler();
var source = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
};
var expected = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
};
var xs = scheduler
.CreateColdObservable(source)
.ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + SchedulerOffset);
CollectionAssert.AreEqual(expected, observed.Messages);
}
最后,这是一个显示 "T" 值被后续 "F" 值抵消的测试:
/// <summary>
/// source: F---T---F-C
/// expected: -F-------F-C
/// </summary>
[Test]
public void ShouldIgnoreTrueWhenFollowedByFalseWithinDelay()
{
var scheduler = new TestScheduler();
var source = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(true)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + 1, Notification.CreateOnCompleted<bool>())
};
var expected = new[]
{
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(1).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 1, Notification.CreateOnNext(false)),
new Recorded<Notification<bool>>(TimeSpan.FromSeconds(3).Ticks + NotificationOffset + 2, Notification.CreateOnCompleted<bool>())
};
var xs = scheduler
.CreateColdObservable(source)
.ThrottleOnTrue(TimeSpan.FromMinutes(1), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromMinutes(1).Ticks + SchedulerOffset);
CollectionAssert.AreEqual(expected, observed.Messages);
}
很确定这就是您要找的东西,但由于您最初的弹珠图有点自相矛盾,所以我不能 100% 确定。不管怎样,我希望它能有所帮助。
你的弹珠图和问题描述的文字在我看来似乎不符。另外,您编写的代码似乎与其中任何一个都不匹配!
所以我认为您希望始终立即生成一个 false
值并发出。我还认为,当出现 true
时,您将延迟 delay
参数,如果同时没有其他值出现,则只输出 true
。如果有人这样做,您只需遵循相同的规则即可。
这主要基于您的代码。
你没有说为什么你认为你的代码不起作用,所以我测试了它。
这是我的代码,可将弹珠图转换为可观察对象,然后通过您的运算符运行它:
var marble = "F---T---F---T----T---T---F---C";
Observable
.Generate(
0,
x => marble[x] != 'C',
x => x + 1,
x => marble[x] == '-' ? (bool?)null : (marble[x] == 'T' ? true : false),
x => TimeSpan.FromSeconds(1.0))
.Where(x => x != null)
.Select(x => x.Value)
.ThrottleOnTrue(TimeSpan.FromSeconds(5.0))
.Timestamp()
您的代码产生了这个:
2020/06/15 01:16:23 +00:00 False 2020/06/15 01:16:31 +00:00 False 2020/06/15 01:16:40 +00:00 True 2020/06/15 01:16:48 +00:00 False
但观察从未结束。
我建议像这样重写您的代码:
public static IObservable<bool> ThrottleOnTrue(this IObservable<bool> source, TimeSpan delay) =>
source
.Select(value =>
Observable
.Return(value)
.Delay(value ? delay : TimeSpan.Zero))
.Switch();
这样我得到了相同的结果,但它完成了。希望你能解决这个问题。