Rx 的连接状态和错误元数据最佳实践
Connection state and error metadata best practice with Rx
系统
我正在构建一个服务器端流式计算引擎,为一家金融服务公司对实时市场数据执行实时计算。
这个想法是计算服务器获得与各种市场数据源(主要是彭博社)的连接,并在新输入(价格、汇率等)到达时重新计算各种相对复杂的计算。
然后,这些计算结果将被推送到使用 SignalR 连接到服务器的各种客户端(WPF 桌面)。在客户端和服务器上,我将使用 Rx 和 IObservable 来实现用于计算和级联下游的管道 activity。
问题
事实是,计算的有效状态可能是 0.3995,但也可能是 "Waiting for price" 甚至 "Server down"。
我不想使用内置的 Rx 错误系统,因为它看起来很脆弱。一旦遇到任何异常,IObservable 将永远终止并且必须重新订阅才能恢复。我希望我的流是健壮和持久的,并且预期可能的可恢复错误状态是数据片段而不是异常。
我目前的想法
我正在尝试将流中的 return 数据包装成一种 monad,如下所示:
有没有人有更好的建议?或者一些可以帮助我正确解决问题的建议?
public enum TransientState
{
NotReady = 0,
Ok = 1,
Faulted = 2,
Fatal = 3
}
public struct RxMessage<T>
{
public TransientState State { get; set; }
public DateTime TimestampUtc { get; set; }
public string Fault { get; set; }
public T Payload { get; set; }
}
您的方法很有道理,而且肯定会奏效。
我可能更喜欢单独的状态流。使用单独的流,您不会继续推送相同的状态数据,客户端可以分开处理新数据和状态更改。
例如:
var statusStream = new Subject<TransientState>();
var tickStream = new Subject<float>();
var finalStream = statusStream
.Select(s => s == TransientState.Ok ? tickStream : Observable.Never<float>() )
.Switch();
finalStream.Dump(); //linqpad
statusStream.OnNext(TransientState.NotReady);
tickStream.OnNext(0);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(1);
tickStream.OnNext(2);
tickStream.OnNext(3);
statusStream.OnNext(TransientState.Faulted);
tickStream.OnNext(4);
tickStream.OnNext(5);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(6);
输出为1 2 3 6
,只有状态正常时输出的数字。
系统
我正在构建一个服务器端流式计算引擎,为一家金融服务公司对实时市场数据执行实时计算。
这个想法是计算服务器获得与各种市场数据源(主要是彭博社)的连接,并在新输入(价格、汇率等)到达时重新计算各种相对复杂的计算。
然后,这些计算结果将被推送到使用 SignalR 连接到服务器的各种客户端(WPF 桌面)。在客户端和服务器上,我将使用 Rx 和 IObservable 来实现用于计算和级联下游的管道 activity。
问题
事实是,计算的有效状态可能是 0.3995,但也可能是 "Waiting for price" 甚至 "Server down"。
我不想使用内置的 Rx 错误系统,因为它看起来很脆弱。一旦遇到任何异常,IObservable 将永远终止并且必须重新订阅才能恢复。我希望我的流是健壮和持久的,并且预期可能的可恢复错误状态是数据片段而不是异常。
我目前的想法
我正在尝试将流中的 return 数据包装成一种 monad,如下所示:
有没有人有更好的建议?或者一些可以帮助我正确解决问题的建议?
public enum TransientState
{
NotReady = 0,
Ok = 1,
Faulted = 2,
Fatal = 3
}
public struct RxMessage<T>
{
public TransientState State { get; set; }
public DateTime TimestampUtc { get; set; }
public string Fault { get; set; }
public T Payload { get; set; }
}
您的方法很有道理,而且肯定会奏效。
我可能更喜欢单独的状态流。使用单独的流,您不会继续推送相同的状态数据,客户端可以分开处理新数据和状态更改。
例如:
var statusStream = new Subject<TransientState>();
var tickStream = new Subject<float>();
var finalStream = statusStream
.Select(s => s == TransientState.Ok ? tickStream : Observable.Never<float>() )
.Switch();
finalStream.Dump(); //linqpad
statusStream.OnNext(TransientState.NotReady);
tickStream.OnNext(0);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(1);
tickStream.OnNext(2);
tickStream.OnNext(3);
statusStream.OnNext(TransientState.Faulted);
tickStream.OnNext(4);
tickStream.OnNext(5);
statusStream.OnNext(TransientState.Ok);
tickStream.OnNext(6);
输出为1 2 3 6
,只有状态正常时输出的数字。