反应性扩展 - 如何进行可变速率轮询?
Reactive extensions - how to do variable rate polling?
我想知道如何 poll/invoke 在设定的时间间隔内使用方法。我也希望能够改变这个固定的时间间隔。所以像这样。
[Reactive]
public TimeSpan Rate { get; set; }
IObservable<TimeSpan> rate = this.WhenAnyValue(vm => vm.Rate);
// poll should emit at the current rate set in `rate observable`
rate.Poll().InvokeCommand(cmd);
在大理石图中
rate ---4----------2--------∞----------4----------------
Poll ---X---X---X--X-X-X-X-XX----------X---X---X---X---X
X - the time when method is executed
请注意,rate
中的新值应强制 Poll
立即发出并取消任何先前应该发出的值。
我试图写一个 Poll Operator
。但是,当我尝试传递一个大 TimeSpan
时它会抱怨。
public static IObservable<TimeSpan> Poll(this IObservable<TimeSpan> sourceObservable, IScheduler scheduler = null)
{
scheduler = scheduler ?? NewThreadScheduler.Default;
return Observable.Create<TimeSpan>(observer =>
{
return scheduler.ScheduleAsync(async (s, ct) =>
{
var timerTokenSource = new CancellationTokenSource();
var compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timerTokenSource.Token, ct);
TimeSpan interval = TimeSpan.MaxValue;
sourceObservable.Subscribe(t =>
{
interval = t;
timerTokenSource.Cancel();
timerTokenSource = new CancellationTokenSource();
compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timerTokenSource.Token, ct);
},
ct);
while (!ct.IsCancellationRequested)
{
await s.Sleep(interval, compositeTokenSource.Token);
observer.OnNext(interval);
}
});
});
}
我想这就是你想要的:
public static IObservable<TimeSpan> Poll(this IObservable<TimeSpan> sourceObservable, IScheduler scheduler = null)
{
scheduler = scheduler ?? NewThreadScheduler.Default;
return
sourceObservable
.Select(ts => Observable.Timer(TimeSpan.Zero, ts).Select(x => ts))
.ObserveOn(scheduler)
.Switch();
}
我不知道你为什么要返回 TimeSpan
。这是为什么?
我想知道如何 poll/invoke 在设定的时间间隔内使用方法。我也希望能够改变这个固定的时间间隔。所以像这样。
[Reactive]
public TimeSpan Rate { get; set; }
IObservable<TimeSpan> rate = this.WhenAnyValue(vm => vm.Rate);
// poll should emit at the current rate set in `rate observable`
rate.Poll().InvokeCommand(cmd);
在大理石图中
rate ---4----------2--------∞----------4----------------
Poll ---X---X---X--X-X-X-X-XX----------X---X---X---X---X
X - the time when method is executed
请注意,rate
中的新值应强制 Poll
立即发出并取消任何先前应该发出的值。
我试图写一个 Poll Operator
。但是,当我尝试传递一个大 TimeSpan
时它会抱怨。
public static IObservable<TimeSpan> Poll(this IObservable<TimeSpan> sourceObservable, IScheduler scheduler = null)
{
scheduler = scheduler ?? NewThreadScheduler.Default;
return Observable.Create<TimeSpan>(observer =>
{
return scheduler.ScheduleAsync(async (s, ct) =>
{
var timerTokenSource = new CancellationTokenSource();
var compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timerTokenSource.Token, ct);
TimeSpan interval = TimeSpan.MaxValue;
sourceObservable.Subscribe(t =>
{
interval = t;
timerTokenSource.Cancel();
timerTokenSource = new CancellationTokenSource();
compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timerTokenSource.Token, ct);
},
ct);
while (!ct.IsCancellationRequested)
{
await s.Sleep(interval, compositeTokenSource.Token);
observer.OnNext(interval);
}
});
});
}
我想这就是你想要的:
public static IObservable<TimeSpan> Poll(this IObservable<TimeSpan> sourceObservable, IScheduler scheduler = null)
{
scheduler = scheduler ?? NewThreadScheduler.Default;
return
sourceObservable
.Select(ts => Observable.Timer(TimeSpan.Zero, ts).Select(x => ts))
.ObserveOn(scheduler)
.Switch();
}
我不知道你为什么要返回 TimeSpan
。这是为什么?