忽略 Observable 中的异常并继续

Ignore Exception in Observable and Continue

考虑这个 Observable:

_listener = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes().ToObservable()
            .SelectMany(CreateUdpListener, CreateMessage)
            .OfType<DeviceMessage>()
            .SelectMany(InjectTestMode)
            .OfType<DeviceMessage>()
            .Do(async message => await PublishMessage(message)))
            .Retry()
            .Subscribe(OnMessageReceive, OnError, OnComplete);

这工作正常,除非在 CreateMessageInjectTestMode 中抛出异常。

我希望 Observable 跳过序列中产生异常的项目并继续。

我读过有关 Catch 的内容,但我发现的示例允许您启动一个新的 Observable,我想继续使用现有的。 目前整个序列重新启动,其中包括我想尽可能避免的 UDP 端口。

[更新]

我和一位同事重新阅读了一些关于使用 IEnumerable<IObservable<>>IObservable<IObservable<>> 的评论,并想出了这个行之有效的方法!但这是right/best实践吗?

如果内部可观察对象发生异常,我想知道它是否只会丢弃来自正在运行的 ReceiveAsync 事件的数据包。

var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
                .ToObservable()
                .Select(UdpListener)
                .SelectMany(listener =>
                {
                  return Observable.Defer(() => Observable
                    .FromAsync(listener.UdpClient.ReceiveAsync)
                    .Where(x => x.Buffer.Length > 0)
                    .Repeat()
                    .Select(result => CreateMessage(listener.DeviceType, result))
                    .SelectMany(InjectTestMode)
                    .OfType<DeviceMessage>()
                    .Do(async message => await PublishMessage(message)))
                    .Retry();
                })).Retry();

_listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);

IObservable<T> 的文档指定序列必须匹配此语法:

OnNext* (OnCompleted|OnError)

异常或完成后不能再发出任何值。如果您手动创建违反此语法的可观察对象,则在使用任何现有 Rx 运算符时可能会出现未定义的行为。不好!

如果您想获得重试行为,请将您的查询建模为 IEnumerable<IObservable<T>>IObservable<IObservable<T>>,其中外部 IEnumerable<*>IObservable<*> 永远不会抛出。

创建一些静态扩展函数,将委托传递给它,并将其包装到函数中的 try{}catch{} 中。