在响应式扩展中使用异常作为数据传输对象

Using exception as a data transfer object in reactive extensions

Rx中主要有OnNext<T>OnErrorOnComplete三个方法,只有OnNext可以传递数据。

我有时间戳数据点和数据流,其中有通过 OnNext 传递的正常值,占数据点的 99% 以上。但是,有些数据点是例外的,例如重写历史的延迟数据或乱序数据点。从逻辑上讲,它是需要特殊处理的数据的下一个 事件 ,但不是按预期顺序排列的下一个数据。

在这种情况下 "normal" 是通过 OnError 通道传递异常值吗?我可以用 T 类型的 属性 定义 OutOfOrderDataExcpetion<T> : Exception 并在观察者中处理它。鉴于异常值很少见,而且我没有 throw/raise 异常,而是构造它们并将它们用作 DTO,性能应该不是问题。

我担心的是调用的方法是OnError而不是OnException,但我的数据不是错误,只是需要特殊处理。同时,如果我将我的可观察对象暴露给对 OutOfOrderExcpetion<T> 一无所知的外部代码,外部代码将无法专门捕获它,这将是一个错误。

我可以遵循事件溯源模式并将每个数据事件包装在一个带有命令和有效负载的包装器中,但随后我需要解压缩每个命令,并且在 99%+ 的情况下它将是相同的命令"next".使用 OnError 作为所有其他命令的通道为我提供了几乎相同的事件源模式和简化的处理。

除了命名问题,您能否添加一些论据来阻止我使用此设计?或者这是对异常的正确使用?

绝对不要使用OnError来传递数据。正如你自己暗示的那样,坏事可能会发生。

我不太明白使用包装器有什么问题。您说您不想编写展开代码,但可以肯定的是,不愿意是不做正确事情的一个非常奇怪的原因。这有 objective 个原因吗?

您是否担心解包代码会使您的程序混乱?但是你不必把它们全部写在一个地方,你可以把它分开并且仍然让一切看起来都很好。类似于:

IObservable<Wrapper> source = ...
var processed = 
    source
    .HandleNormalCase( unwrappedNormalData => { ... } )
    .HandleAbnormalCase( unwrappedExnData => { ... } )

其中 .HandleXxx 是执行展开的扩展方法。

Rx observables 有一个行为契约。您可以保证零次或多次 OnNext 调用,然后是一次且仅一次 OnErrorOnCompleted.

如果你使用 OnError 那么你就是说 "something went wrong and now the sequence has ended".

所以你真的不能使用 OnError 因为它不是 "error channel".

如果你这样做,使用包装器会很简单:

normals
    .Select(data => new { normal = true, data })
    .Merge(
        specials
            .Select(data => new { normal = false, data })