Rx 的连接状态和错误元数据最佳实践

Connection state and error metadata best practice with Rx

系统

我正在构建一个服务器端流式计算引擎,为一家金融服务公司对实时市场数据执行实时计算。

这个想法是计算服务器获得与各种市场数据源(主要是彭博社)的连接,并在新输入(价格、汇率等)到达时重新计算各种相对复杂的计算。

然后,这些计算结果将被推送到使用 SignalR 连接到服务器的各种客户端(WPF 桌面)。在客户端和服务器上,我将使用 Rx 和 IObservable 来实现用于计算和级联下游的管道 activity。

问题

事实是,计算的有效状态可能是 0.3995,但也可能是 "Waiting for price" 甚至 "Server down"。

我不想使用内置的 Rx 错误系统,因为它看起来很脆弱。一旦遇到任何异常,IObservable 将永远终止并且必须重新订阅才能恢复。我希望我的流是健壮和持久的,并且预期可能的可恢复错误状态是数据片段而不是异常。

我目前的想法

我正在尝试将流中的 return 数据包装成一种 monad,如下所示:

有没有人有更好的建议?或者一些可以帮助我正确解决问题的建议?

    public enum TransientState 
    {
        NotReady = 0,
        Ok = 1,
        Faulted = 2,
        Fatal = 3
    }

    public struct RxMessage<T> 
    {
        public TransientState State { get; set; }
        public DateTime TimestampUtc { get; set; }
        public string Fault { get; set; }
        public T Payload { get; set; }
    }

您的方法很有道理,而且肯定会奏效。

我可能更喜欢单独的状态流。使用单独的流,您不会继续推送相同的状态数据,客户端可以分开处理新数据和状态更改。

例如:

var statusStream = new Subject<TransientState>();
var tickStream = new Subject<float>();

var finalStream = statusStream
    .Select(s => s == TransientState.Ok ? tickStream : Observable.Never<float>() )
    .Switch();

finalStream.Dump(); //linqpad

statusStream.OnNext(TransientState.NotReady);
tickStream.OnNext(0);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(1);
tickStream.OnNext(2);
tickStream.OnNext(3);
statusStream.OnNext(TransientState.Faulted);
tickStream.OnNext(4);
tickStream.OnNext(5);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(6);

输出为1 2 3 6,只有状态正常时输出的数字。