如何在超时扩展中获取最新的序列元素?

How to get latest sequence element inside the Timeout extension?

我想知道在超时触发之前获取序列中最新元素的最佳方法是什么?

我有一个代码可以不时对远程服务执行 ping 操作,我希望能够识别出已经离线的服务。

使用超时扩展我想到了这个:

heartbeatResponseObservable.Timeout(Timeout, Observable.Return(new HeartbeatBusMessage.Timeout()))

这有点工作,但它不允许我找到哪个服务已经消失。 我想要的是将流中的最新消息作为参数的超时扩展,以在它产生的错误消息中提供一些信息。

如何获取超时扩展中的最新序列元素?

public static IObservable<T> TimeOutExtension<T>(
    this IObservable<T> source, 
    TimeSpan timeSpan)
{
    // On Timeout complete with an empty Observable.
    var completeOnTimeout = source
                                .Timeout(timeSpan)
                                .Catch<T, TimeoutException>(ex => Observable.Empty<T>());

    // Join the source w/ the empty Observable created on timeout.
    var beforeTimeout =
        source.Join(completeOnTimeout, 
        _ => source, 
        _ => completeOnTimeout, 
        (s, c) => s);

    // Return last
    return beforeTimeout.LastAsync();
}

可以这样使用:

// Create 10 events quickly, then once every two seconds.
var source =
    Observable.Interval(TimeSpan.FromMilliseconds(100))
        .Take(10)
        .Concat(Observable.Interval(TimeSpan.FromSeconds(2)));

// Set a timeout of 1 second.
var last = source.TimeOutExtension(TimeSpan.FromSeconds(1));

last.Subscribe(Console.WriteLine); // outputs 9