Rx.NET取一个元素,过一段时间再订阅
Rx.NET Take an element and subscribe again after some time
我需要一种节流,它的工作原理有点不同。我需要从序列中获取一个元素,取消订阅并在 1 秒内再次订阅。换句话说,我想在获取第一个元素后的 1 秒内忽略所有元素:
Input: (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...
如何用 Rx.NET 实现这个简单的事情?
试试这个:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
这是一个测试:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
这产生了:
0
5
10
14
19
24
29
34
39
从 10 跳到 14 只是使用多线程的结果,而不是查询中的错误。
@Enigmativity 的回答不完全符合规范。它可能适用于你想要的东西。
他的答案定义了 1 秒 windows,并从每个 windows 中取第一个。但是,这并不能保证您在项目之间保持一秒钟的沉默。考虑这种情况:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
答案意味着您希望在第 1 项之后有一秒钟什么都没有。之后的下一项是 3,然后一直保持沉默。这是编码的测试:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
我认为最好的解决方法是基于 Scan
的带有时间戳的解决方案。您基本上将最后一条合法消息保存在内存中并带有时间戳,如果新消息早一秒,则发出。否则,不要:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
注意:测试代码使用 Nuget Microsoft.Reactive.Testing
和以下助手 class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}
我需要一种节流,它的工作原理有点不同。我需要从序列中获取一个元素,取消订阅并在 1 秒内再次订阅。换句话说,我想在获取第一个元素后的 1 秒内忽略所有元素:
Input: (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...
如何用 Rx.NET 实现这个简单的事情?
试试这个:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
这是一个测试:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
这产生了:
0 5 10 14 19 24 29 34 39
从 10 跳到 14 只是使用多线程的结果,而不是查询中的错误。
@Enigmativity 的回答不完全符合规范。它可能适用于你想要的东西。
他的答案定义了 1 秒 windows,并从每个 windows 中取第一个。但是,这并不能保证您在项目之间保持一秒钟的沉默。考虑这种情况:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
答案意味着您希望在第 1 项之后有一秒钟什么都没有。之后的下一项是 3,然后一直保持沉默。这是编码的测试:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
我认为最好的解决方法是基于 Scan
的带有时间戳的解决方案。您基本上将最后一条合法消息保存在内存中并带有时间戳,如果新消息早一秒,则发出。否则,不要:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
注意:测试代码使用 Nuget Microsoft.Reactive.Testing
和以下助手 class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}