RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)
RxSwift - PublishSubject - Ignore error and continue subscription (do not dispose)
我找不到足够的例子来说明如何做到这一点。
本质上,我们有一个PublishSubject
,它只是一个直通;我们没有对其进行任何操作。我们相信如果有任何错误,我们将放弃未来的事件,因为它只是转发它收到的作为订阅信号的值,不确定为什么会出现错误。
(对 Q 的第 2 条评论),我们如何忽略主题上的任何错误并保持订阅有效?
如果没有选项,有没有办法创建一个新订阅onError
并继续收听未来.next()
?
let pubSubj = PublishSubject<String>()
let obs = pubSubj
.observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.catchError { (err) -> Observable<String> in
print("***** catchError \(err)\n")
return .never() // expecting this not to terminate the subscription
}
print("***** obs: \(obs)\n")
let dispose = obs.subscribe(onNext: { (str) in
print("***** received onNext: \(str)\n")
}, onError: { (err) in
print("***** received onErr: \(err)\n")
}, onCompleted: {
print("***** completed\n")
}, onDisposed: {
print("***** onDisposed\n")
})
pubSubj.on(.next("2"))
pubSubj.onError(RxError.overflow) //emits error and terminates
pubSubj.on(.next("3")) //is not received, how can we keep from getting disposed
输出:
***** obs: RxSwift.(unknown context at 24f80f0).Catch<Swift.String>
***** received onNext: 2
***** catchError Arithmetic overflow occurred.
其他 Rx 实现似乎有 ,RxSwift 不支持。
但是有答案指向.catchError
,但是从我们的例子来看,它在错误之后仍然处理,我们没有收到“3
”
在反应流中收到错误后,您将无法接收任何事件,因为流将完成,这对于所有反应式库(包括 RxSwift)始终如此。 catch
将使用闭包中的 Observable
你 return 继续流,但外部流无法再次发出:
let pubSubj = PublishSubject<String>()
let recover = PublishSubject<String>()
let obs = pubSubj
.catch { _ in
recover
}
print("***** obs: \(obs)\n")
let dispose = obs.subscribe(onNext: { (str) in
print("***** received onNext: \(str)\n")
}, onError: { err in
print("***** received onErr: \(err)\n")
}, onCompleted: {
print("***** completed\n")
}, onDisposed: {
print("***** onDisposed\n")
})
pubSubj.onNext("2")
pubSubj.onError(RxError.overflow)
pubSubj.onNext("3") // Will not be sent
recover.onNext("3") // Will be sent
您有几个选项可以实现您的需求:
- 不要在
PublishSubject
中输入错误。本质上 pubSubj.onError(RxError.overflow)
永远不应该发生。
- 如果您需要在
PublishSubject
中提供错误,那么您可以实现它。通过这种方式,错误将作为 next
事件发送,并且流将不会完成,例如:
let pubSubj = PublishSubject<Event<String>>()
let obs = pubSubj
print("***** obs: \(obs)\n")
let dispose = obs.subscribe(onNext: { (str) in
print("***** received onNext: \(str)\n")
}, onCompleted: {
print("***** completed\n")
}, onDisposed: {
print("***** onDisposed\n")
})
pubSubj.onNext(.next("2"))
pubSubj.onNext(.error(RxError.overflow))
pubSubj.onNext(.next("3"))
我找不到足够的例子来说明如何做到这一点。
本质上,我们有一个PublishSubject
,它只是一个直通;我们没有对其进行任何操作。我们相信如果有任何错误,我们将放弃未来的事件,因为它只是转发它收到的作为订阅信号的值,不确定为什么会出现错误。
如果没有选项,有没有办法创建一个新订阅onError
并继续收听未来.next()
?
let pubSubj = PublishSubject<String>()
let obs = pubSubj
.observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.catchError { (err) -> Observable<String> in
print("***** catchError \(err)\n")
return .never() // expecting this not to terminate the subscription
}
print("***** obs: \(obs)\n")
let dispose = obs.subscribe(onNext: { (str) in
print("***** received onNext: \(str)\n")
}, onError: { (err) in
print("***** received onErr: \(err)\n")
}, onCompleted: {
print("***** completed\n")
}, onDisposed: {
print("***** onDisposed\n")
})
pubSubj.on(.next("2"))
pubSubj.onError(RxError.overflow) //emits error and terminates
pubSubj.on(.next("3")) //is not received, how can we keep from getting disposed
输出:
***** obs: RxSwift.(unknown context at 24f80f0).Catch<Swift.String>
***** received onNext: 2
***** catchError Arithmetic overflow occurred.
其他 Rx 实现似乎有
但是有答案指向.catchError
,但是从我们的例子来看,它在错误之后仍然处理,我们没有收到“3
”
在反应流中收到错误后,您将无法接收任何事件,因为流将完成,这对于所有反应式库(包括 RxSwift)始终如此。 catch
将使用闭包中的 Observable
你 return 继续流,但外部流无法再次发出:
let pubSubj = PublishSubject<String>()
let recover = PublishSubject<String>()
let obs = pubSubj
.catch { _ in
recover
}
print("***** obs: \(obs)\n")
let dispose = obs.subscribe(onNext: { (str) in
print("***** received onNext: \(str)\n")
}, onError: { err in
print("***** received onErr: \(err)\n")
}, onCompleted: {
print("***** completed\n")
}, onDisposed: {
print("***** onDisposed\n")
})
pubSubj.onNext("2")
pubSubj.onError(RxError.overflow)
pubSubj.onNext("3") // Will not be sent
recover.onNext("3") // Will be sent
您有几个选项可以实现您的需求:
- 不要在
PublishSubject
中输入错误。本质上pubSubj.onError(RxError.overflow)
永远不应该发生。 - 如果您需要在
PublishSubject
中提供错误,那么您可以实现它。通过这种方式,错误将作为next
事件发送,并且流将不会完成,例如:
let pubSubj = PublishSubject<Event<String>>()
let obs = pubSubj
print("***** obs: \(obs)\n")
let dispose = obs.subscribe(onNext: { (str) in
print("***** received onNext: \(str)\n")
}, onCompleted: {
print("***** completed\n")
}, onDisposed: {
print("***** onDisposed\n")
})
pubSubj.onNext(.next("2"))
pubSubj.onNext(.error(RxError.overflow))
pubSubj.onNext(.next("3"))