RxSwift 运营商如何影响重新订阅?

How do RxSwift operators affect resubscribes?

我在 RxSwift 中使用 retry 运算符。根据它的文档,它会在遇到错误时“重新订阅”可观察到的源。

一切都很好。但是,当它包含热可观察对象或 hot/cold 可观察对象的混合体时,我不确定如何推理“源可观察对象”。

一个我确定的例子:

let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let coldObservableRetry = coldObservable.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(2) // retry is inclusive of original attempt
coldObservableRetry.subscribe(onNext: { print ([=10=])} ) // prints 1, 2, 1, 2, before erroring out.

一个我不确定的例子:

let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let hotObservableRetry = hotObservable.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(2)
hotObservableRetry.subscribe(onNext: { print ([=11=])} ) // What happens here?

另一个我不确定的例子:

let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let delayedHotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).delay(.milliseconds(100), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservableRetry = Observable.combineLatest(coldObservable, delayedHotObservable).map { [=12=] +  }.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(5)
mixtureObservableRetry.subscribe(onNext: { print ([=12=])} ) // What happens here? What does it even mean to resubscribe to a combineLatest with a hot and a cold observable?

又一个我不确定的例子:

let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservable2Retry = coldObservable.flatMapLatest { _ in hotObservable }
.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(2)
mixtureObservable2Retry.subscribe(onNext: { print ([=13=])} ) // What happens here?

您的许多示例都无法编译,所以会出现编译错误。 :-) 但要回答问题...

有一些重要的事情要记住。

  1. 每个运算符都订阅其源 observable 并生成一个新的 observable。
  2. 当一个冷的 observable 被订阅时,它会发出 0...N next 个事件,然后是一个停止事件(可以是 completederror。)它将为每个订阅提供自己的一组事件。
  3. Hot observable 在 connect 被调用之前不会开始发射。所有订阅将共享同一组事件。

最后在这种情况下,debug 是你的朋友。

因此,对于您的第一个示例,您不确定(进行了调整,以便编译并 运行 添加了一些调试运算符):

func example() {
    let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        .debug("before publish")
        .publish()
    let hotObservableRetry = hotObservable
        .debug("after publish")
        .map { (num) -> Int in
            guard num % 3 != 0 else { throw MyError() }
            return num
        }
        .debug("after map")
        .retry(2)
        .debug("after retry")
    _ = hotObservableRetry.subscribe()
    _ = hotObservable.connect()
}

将产生以下输出。

以下是关于有助于学习过程的输出的一些注意事项。

  • 订阅以相反的顺序进行。
  • 一旦发出第一个错误,重试运算符就会重新订阅地图运算符的可观察对象,地图运算符会重新订阅发布的可观察对象。由于发布的 observable 是热的,重新订阅链就停在那里。计时器的可观察对象没有被重新订阅。

以上是回答本题的要点。

after retry -> subscribed
after map -> subscribed
after publish -> subscribed
before publish -> subscribed
before publish -> Event next(0)
after publish -> Event next(0)
after map -> Event error(MyError())
after map -> isDisposed
after publish -> isDisposed
after map -> subscribed
after publish -> subscribed
before publish -> Event next(1)
after publish -> Event next(1)
after map -> Event next(1)
after retry -> Event next(1)
before publish -> Event next(2)
after publish -> Event next(2)
after map -> Event next(2)
after retry -> Event next(2)
before publish -> Event next(3)
after publish -> Event next(3)
after map -> Event error(MyError())
after retry -> Event error(MyError())
after retry -> isDisposed
after map -> isDisposed
after publish -> isDisposed
before publish -> Event next(4)
before publish -> Event next(5)
before publish -> Event next(6)
...

在您展示的下一个示例中,combineLatest 运算符重新订阅热可观察对象,但可连接可观察对象不会重新订阅 源。