如何在超时扩展中获取最新的序列元素?
How to get latest sequence element inside the Timeout extension?
我想知道在超时触发之前获取序列中最新元素的最佳方法是什么?
我有一个代码可以不时对远程服务执行 ping 操作,我希望能够识别出已经离线的服务。
使用超时扩展我想到了这个:
heartbeatResponseObservable.Timeout(Timeout, Observable.Return(new HeartbeatBusMessage.Timeout()))
这有点工作,但它不允许我找到哪个服务已经消失。
我想要的是将流中的最新消息作为参数的超时扩展,以在它产生的错误消息中提供一些信息。
如何获取超时扩展中的最新序列元素?
public static IObservable<T> TimeOutExtension<T>(
this IObservable<T> source,
TimeSpan timeSpan)
{
// On Timeout complete with an empty Observable.
var completeOnTimeout = source
.Timeout(timeSpan)
.Catch<T, TimeoutException>(ex => Observable.Empty<T>());
// Join the source w/ the empty Observable created on timeout.
var beforeTimeout =
source.Join(completeOnTimeout,
_ => source,
_ => completeOnTimeout,
(s, c) => s);
// Return last
return beforeTimeout.LastAsync();
}
可以这样使用:
// Create 10 events quickly, then once every two seconds.
var source =
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(10)
.Concat(Observable.Interval(TimeSpan.FromSeconds(2)));
// Set a timeout of 1 second.
var last = source.TimeOutExtension(TimeSpan.FromSeconds(1));
last.Subscribe(Console.WriteLine); // outputs 9
我想知道在超时触发之前获取序列中最新元素的最佳方法是什么?
我有一个代码可以不时对远程服务执行 ping 操作,我希望能够识别出已经离线的服务。
使用超时扩展我想到了这个:
heartbeatResponseObservable.Timeout(Timeout, Observable.Return(new HeartbeatBusMessage.Timeout()))
这有点工作,但它不允许我找到哪个服务已经消失。 我想要的是将流中的最新消息作为参数的超时扩展,以在它产生的错误消息中提供一些信息。
如何获取超时扩展中的最新序列元素?
public static IObservable<T> TimeOutExtension<T>(
this IObservable<T> source,
TimeSpan timeSpan)
{
// On Timeout complete with an empty Observable.
var completeOnTimeout = source
.Timeout(timeSpan)
.Catch<T, TimeoutException>(ex => Observable.Empty<T>());
// Join the source w/ the empty Observable created on timeout.
var beforeTimeout =
source.Join(completeOnTimeout,
_ => source,
_ => completeOnTimeout,
(s, c) => s);
// Return last
return beforeTimeout.LastAsync();
}
可以这样使用:
// Create 10 events quickly, then once every two seconds.
var source =
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(10)
.Concat(Observable.Interval(TimeSpan.FromSeconds(2)));
// Set a timeout of 1 second.
var last = source.TimeOutExtension(TimeSpan.FromSeconds(1));
last.Subscribe(Console.WriteLine); // outputs 9