将 IObservable<T> 转换为另一个根据输入序列的元素出错或完成的序列
Transform IObservable<T> into another sequence that faults or completes based on elements of the input sequence
我正在实现一个进程间消息传递系统,例如客户端可以向服务器请求某些数据。服务器需要能够以部分响应的形式发回其回复,并且还需要能够在发生任何异常时通知客户端。
目前,我通过 3 种消息类型执行此操作:
class PartialResponse : ResponseMessage { ... }
class ResponseError : ResponseMessage { ... }
class ResponseComplete : ResponseMessage { ... ]
所以例如客户端请求数据,服务器发回 0-N PartialResponse 消息,后跟 ResponseError 或 ResponseComplete。
我正在使用的库(以 NetMQ 作为其传输层的 Obvs)会将所有可能的消息流公开为
IObservable<ResponseMessage>
虽然这个可观察的流永远不会完成,而且我相信也不会出错(除非可能是一些 Obvs/NetMQ 内部异常或类似的)。
我想将其转换为 IObservable,它在原始流推送 ResponseComplete 消息时完成,并在遇到 ResponseError 消息或输入流中的实际错误时出错。例如。类似于:
IObservable<PartialResponse> transform(IObservable<ResponseMessage> input)
{
var subject = new Subject<PartialResponse>();
input.Subscribe(
x =>
{
if(x is PartialResponse r)
subject.OnNext(r);
else if(x is ResponseComplete)
subject.OnCompleted();
else if(x is ResponseError err)
subject.OnError(new Exception(err?.ToString()));
else
throw new InvalidOperationException();
},
ex =>
{
subject.OnError(ex);
}
);
return subject;
}
此代码实际上应该可以工作,但可能非常糟糕 - 尤其是因为它直接订阅了输入可观察序列。
有什么better/cleaner方法可以转换可观察序列吗?
这是@Enigmativity 的充实回答:
var input = new Subject<ResponseMessage>();
var partialResponseObservable = input
.Select(msg =>
(msg is PartialResponse r)
? Notification.CreateOnNext(r)
: (msg is ResponseComplete)
? Notification.CreateOnCompleted<PartialResponse>()
: (msg is ResponseError err)
? Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()))
: throw new InvalidOperationException()
)
.Dematerialize();
或类型匹配(可能读起来更好):
var partialResponseObservable = input
.Select(msg =>
{
switch(msg)
{
case PartialResponse r:
return Notification.CreateOnNext(r);
case ResponseComplete rc:
return Notification.CreateOnCompleted<PartialResponse>();
case ResponseError err:
return Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()));
default:
throw new InvalidOperationException();
}
})
.Dematerialize();
我正在实现一个进程间消息传递系统,例如客户端可以向服务器请求某些数据。服务器需要能够以部分响应的形式发回其回复,并且还需要能够在发生任何异常时通知客户端。
目前,我通过 3 种消息类型执行此操作:
class PartialResponse : ResponseMessage { ... }
class ResponseError : ResponseMessage { ... }
class ResponseComplete : ResponseMessage { ... ]
所以例如客户端请求数据,服务器发回 0-N PartialResponse 消息,后跟 ResponseError 或 ResponseComplete。
我正在使用的库(以 NetMQ 作为其传输层的 Obvs)会将所有可能的消息流公开为
IObservable<ResponseMessage>
虽然这个可观察的流永远不会完成,而且我相信也不会出错(除非可能是一些 Obvs/NetMQ 内部异常或类似的)。
我想将其转换为 IObservable
IObservable<PartialResponse> transform(IObservable<ResponseMessage> input)
{
var subject = new Subject<PartialResponse>();
input.Subscribe(
x =>
{
if(x is PartialResponse r)
subject.OnNext(r);
else if(x is ResponseComplete)
subject.OnCompleted();
else if(x is ResponseError err)
subject.OnError(new Exception(err?.ToString()));
else
throw new InvalidOperationException();
},
ex =>
{
subject.OnError(ex);
}
);
return subject;
}
此代码实际上应该可以工作,但可能非常糟糕 - 尤其是因为它直接订阅了输入可观察序列。
有什么better/cleaner方法可以转换可观察序列吗?
这是@Enigmativity 的充实回答:
var input = new Subject<ResponseMessage>();
var partialResponseObservable = input
.Select(msg =>
(msg is PartialResponse r)
? Notification.CreateOnNext(r)
: (msg is ResponseComplete)
? Notification.CreateOnCompleted<PartialResponse>()
: (msg is ResponseError err)
? Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()))
: throw new InvalidOperationException()
)
.Dematerialize();
或类型匹配(可能读起来更好):
var partialResponseObservable = input
.Select(msg =>
{
switch(msg)
{
case PartialResponse r:
return Notification.CreateOnNext(r);
case ResponseComplete rc:
return Notification.CreateOnCompleted<PartialResponse>();
case ResponseError err:
return Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()));
default:
throw new InvalidOperationException();
}
})
.Dematerialize();