是否可以向 Rx.Net 超时运算符添加自定义消息

Is it possible to add a custom message to the Rx.Net Timeout operator

我正在尝试从设备读取数据流并在其中找到有效数据。 它是一组零,中间有一些数据。 类似于:

0,0,0,1,2,3,2,1,0,0,0,0,1,2,3,2,1,0,0,0 等等。

处理后发出的是非零数据组的总和,例如

9,9

我的代码中有两个超时运算符。不幸的是,他们都有相同的信息:

System.TimeoutException: The operation has timed out.

所以我无法向用户提供更多上下文,就好像错误发生的地方一样,例如数据流静默或我们有一个数据流 运行 但我们只得到零。

有什么方法可以添加我自己的消息而不是常规的 TimeoutException 消息?

这是我的伪代码:

var _handle = 
        _source.Select(x => x.Item1)
        .Timeout(TimeSpan.FromSeconds(5)) // If the stream goes silent here the timeout is raised.
        .EmitSumOfDataNonZeroPoints()
        .Timeout(TimeSpan.FromSeconds(15)) // If no valid group of nonzero data is found the timeout is raised.
        .Subscribe(
            someProcessing => {},
            ex => {
                _errorInObservableMessage = ex.Message;
            },
            () => {}
        );

Timeout 运算符有一个重载,您可以在其中传递一个 other 可观察对象,它将在超时的情况下继续:

// Applies a timeout policy for each element in the observable sequence. If the
// next element isn't received within the specified timeout duration starting from
// its predecessor, the other observable sequence is used to produce future messages
// from that point on.
public static IObservable<TSource> Timeout<TSource>(this IObservable<TSource> source,
    TimeSpan dueTime, IObservable<TSource> other);

您可以直接使用此重载,或将其用作接受 timeoutMessage 参数的自定义 Timeout 运算符的构建块:

/// <summary>In case of timeout propagates a TimeoutException with custom message.</summary>
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, TimeSpan dueTime, string timeoutMessage)
{
    return source.Timeout(dueTime,
        Observable.Throw<TSource>(new TimeoutException(timeoutMessage)));
}