.NET ReactiveExtensions:使用具有可变时间跨度的 Sample()
.NET ReactiveExtensions: Use Sample() with variable timespan
鉴于高频可观察数据流,我只想每 XX 秒发出一个项目。
这通常在 RX 中使用 .Sample(TimeSpan.FromSeconds(XX))
但是...我希望时间间隔根据数据的某些 属性 而变化。
假设我的数据是:
class 位置
{
...
public int 速度;
}
如果 Speed 小于 100,我想每 5 秒发送一次数据。如果速度高于 100,则应每 2 秒。
现成的 Sample() 是否可行,还是我需要自己构建一些东西?
让我知道这是否可行:
var query =
source
.Publish(ss =>
ss
.Select(s => s.Speed < 100 ? 5.0 : 2.0)
.Distinct()
.Select(x => ss.Sample(TimeSpan.FromSeconds(x))));
这是一个低级实现,利用 System.Reactive.Concurrency.Scheduler.SchedulePeriodic
扩展方法作为计时器。
public static IObservable<TSource> Sample<TSource>(this IObservable<TSource> source,
Func<TSource, TimeSpan> intervalSelector, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (intervalSelector == null)
throw new ArgumentNullException(nameof(intervalSelector));
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(observer =>
{
TimeSpan currentInterval = Timeout.InfiniteTimeSpan;
IDisposable timer = null;
TSource latestItem = default;
bool latestEmitted = true;
object locker = new object();
Action periodicAction = () =>
{
TSource itemToEmit;
lock (locker)
{
if (latestEmitted) return;
itemToEmit = latestItem;
latestItem = default;
latestEmitted = true;
}
observer.OnNext(itemToEmit);
};
return source.Subscribe(onNext: item =>
{
lock (locker)
{
latestItem = item;
latestEmitted = false;
}
var newInterval = intervalSelector(item);
if (newInterval != currentInterval)
{
timer?.Dispose();
timer = scheduler.SchedulePeriodic(newInterval, periodicAction);
currentInterval = newInterval;
}
}, onError: ex =>
{
timer?.Dispose();
observer.OnError(ex);
}, onCompleted: () =>
{
timer?.Dispose();
observer.OnCompleted();
});
});
}
用法示例:
observable.Sample(x => TimeSpan.FromSeconds(x.Speed < 100 ? 5.0 : 2.0));
每次 intervalSelector
回调 returns 不同的间隔时,计时器都会重新启动。在极端情况下,每个新项目都会更改间隔,那么此自定义运算符的行为将更像内置的 Throttle
than the built-in Sample
.
Unlike Sample
, Throttle
's period is a sliding window. Each time Throttle
receives a value, the window is reset. (citation)
鉴于高频可观察数据流,我只想每 XX 秒发出一个项目。
这通常在 RX 中使用 .Sample(TimeSpan.FromSeconds(XX))
但是...我希望时间间隔根据数据的某些 属性 而变化。
假设我的数据是:
class 位置 { ... public int 速度; }
如果 Speed 小于 100,我想每 5 秒发送一次数据。如果速度高于 100,则应每 2 秒。
现成的 Sample() 是否可行,还是我需要自己构建一些东西?
让我知道这是否可行:
var query =
source
.Publish(ss =>
ss
.Select(s => s.Speed < 100 ? 5.0 : 2.0)
.Distinct()
.Select(x => ss.Sample(TimeSpan.FromSeconds(x))));
这是一个低级实现,利用 System.Reactive.Concurrency.Scheduler.SchedulePeriodic
扩展方法作为计时器。
public static IObservable<TSource> Sample<TSource>(this IObservable<TSource> source,
Func<TSource, TimeSpan> intervalSelector, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (intervalSelector == null)
throw new ArgumentNullException(nameof(intervalSelector));
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(observer =>
{
TimeSpan currentInterval = Timeout.InfiniteTimeSpan;
IDisposable timer = null;
TSource latestItem = default;
bool latestEmitted = true;
object locker = new object();
Action periodicAction = () =>
{
TSource itemToEmit;
lock (locker)
{
if (latestEmitted) return;
itemToEmit = latestItem;
latestItem = default;
latestEmitted = true;
}
observer.OnNext(itemToEmit);
};
return source.Subscribe(onNext: item =>
{
lock (locker)
{
latestItem = item;
latestEmitted = false;
}
var newInterval = intervalSelector(item);
if (newInterval != currentInterval)
{
timer?.Dispose();
timer = scheduler.SchedulePeriodic(newInterval, periodicAction);
currentInterval = newInterval;
}
}, onError: ex =>
{
timer?.Dispose();
observer.OnError(ex);
}, onCompleted: () =>
{
timer?.Dispose();
observer.OnCompleted();
});
});
}
用法示例:
observable.Sample(x => TimeSpan.FromSeconds(x.Speed < 100 ? 5.0 : 2.0));
每次 intervalSelector
回调 returns 不同的间隔时,计时器都会重新启动。在极端情况下,每个新项目都会更改间隔,那么此自定义运算符的行为将更像内置的 Throttle
than the built-in Sample
.
Unlike
Sample
,Throttle
's period is a sliding window. Each timeThrottle
receives a value, the window is reset. (citation)