忽略 Observable 中的异常并继续
Ignore Exception in Observable and Continue
考虑这个 Observable:
_listener = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes().ToObservable()
.SelectMany(CreateUdpListener, CreateMessage)
.OfType<DeviceMessage>()
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry()
.Subscribe(OnMessageReceive, OnError, OnComplete);
这工作正常,除非在 CreateMessage
或 InjectTestMode
中抛出异常。
我希望 Observable 跳过序列中产生异常的项目并继续。
我读过有关 Catch 的内容,但我发现的示例允许您启动一个新的 Observable,我想继续使用现有的。
目前整个序列重新启动,其中包括我想尽可能避免的 UDP 端口。
[更新]
我和一位同事重新阅读了一些关于使用 IEnumerable<IObservable<>>
或 IObservable<IObservable<>>
的评论,并想出了这个行之有效的方法!但这是right/best实践吗?
如果内部可观察对象发生异常,我想知道它是否只会丢弃来自正在运行的 ReceiveAsync
事件的数据包。
var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
.ToObservable()
.Select(UdpListener)
.SelectMany(listener =>
{
return Observable.Defer(() => Observable
.FromAsync(listener.UdpClient.ReceiveAsync)
.Where(x => x.Buffer.Length > 0)
.Repeat()
.Select(result => CreateMessage(listener.DeviceType, result))
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry();
})).Retry();
_listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);
IObservable<T>
的文档指定序列必须匹配此语法:
OnNext* (OnCompleted|OnError)
异常或完成后不能再发出任何值。如果您手动创建违反此语法的可观察对象,则在使用任何现有 Rx 运算符时可能会出现未定义的行为。不好!
如果您想获得重试行为,请将您的查询建模为 IEnumerable<IObservable<T>>
或 IObservable<IObservable<T>>
,其中外部 IEnumerable<*>
或 IObservable<*>
永远不会抛出。
创建一些静态扩展函数,将委托传递给它,并将其包装到函数中的 try{}catch{} 中。
考虑这个 Observable:
_listener = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes().ToObservable()
.SelectMany(CreateUdpListener, CreateMessage)
.OfType<DeviceMessage>()
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry()
.Subscribe(OnMessageReceive, OnError, OnComplete);
这工作正常,除非在 CreateMessage
或 InjectTestMode
中抛出异常。
我希望 Observable 跳过序列中产生异常的项目并继续。
我读过有关 Catch 的内容,但我发现的示例允许您启动一个新的 Observable,我想继续使用现有的。 目前整个序列重新启动,其中包括我想尽可能避免的 UDP 端口。
[更新]
我和一位同事重新阅读了一些关于使用 IEnumerable<IObservable<>>
或 IObservable<IObservable<>>
的评论,并想出了这个行之有效的方法!但这是right/best实践吗?
如果内部可观察对象发生异常,我想知道它是否只会丢弃来自正在运行的 ReceiveAsync
事件的数据包。
var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
.ToObservable()
.Select(UdpListener)
.SelectMany(listener =>
{
return Observable.Defer(() => Observable
.FromAsync(listener.UdpClient.ReceiveAsync)
.Where(x => x.Buffer.Length > 0)
.Repeat()
.Select(result => CreateMessage(listener.DeviceType, result))
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry();
})).Retry();
_listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);
IObservable<T>
的文档指定序列必须匹配此语法:
OnNext* (OnCompleted|OnError)
异常或完成后不能再发出任何值。如果您手动创建违反此语法的可观察对象,则在使用任何现有 Rx 运算符时可能会出现未定义的行为。不好!
如果您想获得重试行为,请将您的查询建模为 IEnumerable<IObservable<T>>
或 IObservable<IObservable<T>>
,其中外部 IEnumerable<*>
或 IObservable<*>
永远不会抛出。
创建一些静态扩展函数,将委托传递给它,并将其包装到函数中的 try{}catch{} 中。