RxSwift 运营商如何影响重新订阅?
How do RxSwift operators affect resubscribes?
我在 RxSwift 中使用 retry
运算符。根据它的文档,它会在遇到错误时“重新订阅”可观察到的源。
一切都很好。但是,当它包含热可观察对象或 hot/cold 可观察对象的混合体时,我不确定如何推理“源可观察对象”。
一个我确定的例子:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let coldObservableRetry = coldObservable.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2) // retry is inclusive of original attempt
coldObservableRetry.subscribe(onNext: { print ([=10=])} ) // prints 1, 2, 1, 2, before erroring out.
一个我不确定的例子:
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let hotObservableRetry = hotObservable.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2)
hotObservableRetry.subscribe(onNext: { print ([=11=])} ) // What happens here?
另一个我不确定的例子:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let delayedHotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).delay(.milliseconds(100), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservableRetry = Observable.combineLatest(coldObservable, delayedHotObservable).map { [=12=] + }.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(5)
mixtureObservableRetry.subscribe(onNext: { print ([=12=])} ) // What happens here? What does it even mean to resubscribe to a combineLatest with a hot and a cold observable?
又一个我不确定的例子:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservable2Retry = coldObservable.flatMapLatest { _ in hotObservable }
.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2)
mixtureObservable2Retry.subscribe(onNext: { print ([=13=])} ) // What happens here?
您的许多示例都无法编译,所以会出现编译错误。 :-) 但要回答问题...
有一些重要的事情要记住。
- 每个运算符都订阅其源 observable 并生成一个新的 observable。
- 当一个冷的 observable 被订阅时,它会发出 0...N
next
个事件,然后是一个停止事件(可以是 completed
或 error
。)它将为每个订阅提供自己的一组事件。
- Hot observable 在
connect
被调用之前不会开始发射。所有订阅将共享同一组事件。
最后在这种情况下,debug
是你的朋友。
因此,对于您的第一个示例,您不确定(进行了调整,以便编译并 运行 添加了一些调试运算符):
func example() {
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug("before publish")
.publish()
let hotObservableRetry = hotObservable
.debug("after publish")
.map { (num) -> Int in
guard num % 3 != 0 else { throw MyError() }
return num
}
.debug("after map")
.retry(2)
.debug("after retry")
_ = hotObservableRetry.subscribe()
_ = hotObservable.connect()
}
将产生以下输出。
以下是关于有助于学习过程的输出的一些注意事项。
- 订阅以相反的顺序进行。
- 一旦发出第一个错误,重试运算符就会重新订阅地图运算符的可观察对象,地图运算符会重新订阅发布的可观察对象。由于发布的 observable 是热的,重新订阅链就停在那里。计时器的可观察对象没有被重新订阅。
以上是回答本题的要点。
after retry -> subscribed
after map -> subscribed
after publish -> subscribed
before publish -> subscribed
before publish -> Event next(0)
after publish -> Event next(0)
after map -> Event error(MyError())
after map -> isDisposed
after publish -> isDisposed
after map -> subscribed
after publish -> subscribed
before publish -> Event next(1)
after publish -> Event next(1)
after map -> Event next(1)
after retry -> Event next(1)
before publish -> Event next(2)
after publish -> Event next(2)
after map -> Event next(2)
after retry -> Event next(2)
before publish -> Event next(3)
after publish -> Event next(3)
after map -> Event error(MyError())
after retry -> Event error(MyError())
after retry -> isDisposed
after map -> isDisposed
after publish -> isDisposed
before publish -> Event next(4)
before publish -> Event next(5)
before publish -> Event next(6)
...
在您展示的下一个示例中,combineLatest 运算符重新订阅热可观察对象,但可连接可观察对象不会重新订阅其 源。
我在 RxSwift 中使用 retry
运算符。根据它的文档,它会在遇到错误时“重新订阅”可观察到的源。
一切都很好。但是,当它包含热可观察对象或 hot/cold 可观察对象的混合体时,我不确定如何推理“源可观察对象”。
一个我确定的例子:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let coldObservableRetry = coldObservable.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2) // retry is inclusive of original attempt
coldObservableRetry.subscribe(onNext: { print ([=10=])} ) // prints 1, 2, 1, 2, before erroring out.
一个我不确定的例子:
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let hotObservableRetry = hotObservable.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2)
hotObservableRetry.subscribe(onNext: { print ([=11=])} ) // What happens here?
另一个我不确定的例子:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let delayedHotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).delay(.milliseconds(100), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservableRetry = Observable.combineLatest(coldObservable, delayedHotObservable).map { [=12=] + }.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(5)
mixtureObservableRetry.subscribe(onNext: { print ([=12=])} ) // What happens here? What does it even mean to resubscribe to a combineLatest with a hot and a cold observable?
又一个我不确定的例子:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservable2Retry = coldObservable.flatMapLatest { _ in hotObservable }
.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2)
mixtureObservable2Retry.subscribe(onNext: { print ([=13=])} ) // What happens here?
您的许多示例都无法编译,所以会出现编译错误。 :-) 但要回答问题...
有一些重要的事情要记住。
- 每个运算符都订阅其源 observable 并生成一个新的 observable。
- 当一个冷的 observable 被订阅时,它会发出 0...N
next
个事件,然后是一个停止事件(可以是completed
或error
。)它将为每个订阅提供自己的一组事件。 - Hot observable 在
connect
被调用之前不会开始发射。所有订阅将共享同一组事件。
最后在这种情况下,debug
是你的朋友。
因此,对于您的第一个示例,您不确定(进行了调整,以便编译并 运行 添加了一些调试运算符):
func example() {
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug("before publish")
.publish()
let hotObservableRetry = hotObservable
.debug("after publish")
.map { (num) -> Int in
guard num % 3 != 0 else { throw MyError() }
return num
}
.debug("after map")
.retry(2)
.debug("after retry")
_ = hotObservableRetry.subscribe()
_ = hotObservable.connect()
}
将产生以下输出。
以下是关于有助于学习过程的输出的一些注意事项。
- 订阅以相反的顺序进行。
- 一旦发出第一个错误,重试运算符就会重新订阅地图运算符的可观察对象,地图运算符会重新订阅发布的可观察对象。由于发布的 observable 是热的,重新订阅链就停在那里。计时器的可观察对象没有被重新订阅。
以上是回答本题的要点。
after retry -> subscribed
after map -> subscribed
after publish -> subscribed
before publish -> subscribed
before publish -> Event next(0)
after publish -> Event next(0)
after map -> Event error(MyError())
after map -> isDisposed
after publish -> isDisposed
after map -> subscribed
after publish -> subscribed
before publish -> Event next(1)
after publish -> Event next(1)
after map -> Event next(1)
after retry -> Event next(1)
before publish -> Event next(2)
after publish -> Event next(2)
after map -> Event next(2)
after retry -> Event next(2)
before publish -> Event next(3)
after publish -> Event next(3)
after map -> Event error(MyError())
after retry -> Event error(MyError())
after retry -> isDisposed
after map -> isDisposed
after publish -> isDisposed
before publish -> Event next(4)
before publish -> Event next(5)
before publish -> Event next(6)
...
在您展示的下一个示例中,combineLatest 运算符重新订阅热可观察对象,但可连接可观察对象不会重新订阅其 源。