RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)

RxSwift - PublishSubject - Ignore error and continue subscription (do not dispose)

我找不到足够的例子来说明如何做到这一点。

本质上,我们有一个PublishSubject,它只是一个直通;我们没有对其进行任何操作。我们相信如果有任何错误,我们将放弃未来的事件,因为它只是转发它收到的作为订阅信号的值,不确定为什么会出现错误。

(对 Q 的第 2 条评论),我们如何忽略主题上的任何错误并保持订阅有效?

如果没有选项,有没有办法创建一个新订阅onError并继续收听未来.next()

let pubSubj = PublishSubject<String>()
let obs = pubSubj
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
    .catchError { (err) -> Observable<String> in
        print("***** catchError \(err)\n")
        return .never() // expecting this not to terminate the subscription
    }

print("***** obs: \(obs)\n")


let dispose = obs.subscribe(onNext: { (str) in
    print("***** received onNext: \(str)\n")
}, onError: { (err) in
    print("***** received onErr: \(err)\n")
}, onCompleted: {
    print("***** completed\n")
}, onDisposed: {
    print("***** onDisposed\n")
})
                    
pubSubj.on(.next("2"))
pubSubj.onError(RxError.overflow) //emits error and terminates
            
pubSubj.on(.next("3")) //is not received, how can we keep from getting disposed

输出:

***** obs: RxSwift.(unknown context at 24f80f0).Catch<Swift.String>
***** received onNext: 2
***** catchError Arithmetic overflow occurred.

其他 Rx 实现似乎有 ,RxSwift 不支持。

但是有答案指向.catchError,但是从我们的例子来看,它在错误之后仍然处理,我们没有收到“3

在反应流中收到错误后,您将无法接收任何事件,因为流将完成,这对于所有反应式库(包括 RxSwift)始终如此。 catch 将使用闭包中的 Observable 你 return 继续流,但外部流无法再次发出:

  let pubSubj = PublishSubject<String>()
  let recover = PublishSubject<String>()
  let obs = pubSubj
    .catch { _ in
      recover
    }

  print("***** obs: \(obs)\n")


  let dispose = obs.subscribe(onNext: { (str) in
    print("***** received onNext: \(str)\n")
  }, onError: { err in
    print("***** received onErr: \(err)\n")
  }, onCompleted: {
    print("***** completed\n")
  }, onDisposed: {
    print("***** onDisposed\n")
  })
                      
  pubSubj.onNext("2")
  pubSubj.onError(RxError.overflow)
  pubSubj.onNext("3") // Will not be sent
  recover.onNext("3") // Will be sent

您有几个选项可以实现您的需求:

  1. 不要在 PublishSubject 中输入错误。本质上 pubSubj.onError(RxError.overflow) 永远不应该发生。
  2. 如果您需要在 PublishSubject 中提供错误,那么您可以实现它。通过这种方式,错误将作为 next 事件发送,并且流将不会完成,例如:
  let pubSubj = PublishSubject<Event<String>>()
  let obs = pubSubj

  print("***** obs: \(obs)\n")


  let dispose = obs.subscribe(onNext: { (str) in
      print("***** received onNext: \(str)\n")
  }, onCompleted: {
      print("***** completed\n")
  }, onDisposed: {
      print("***** onDisposed\n")
  })
                      
  pubSubj.onNext(.next("2"))
  pubSubj.onNext(.error(RxError.overflow))
  pubSubj.onNext(.next("3"))