RxSwift combineLatest 从最后一个 Observable 发出所有重放的元素

RxSwift combineLatest emits all replayed elements from last Observable

在冷可观察对象集合上使用 combineLatest 时,我看到了一些意想不到的结果。它发出除最后一个 Observable 之外的所有 Observable 中的最新元素,并将第一个 (n-1) 个 Observable 中的最新元素与第 n 个 Observable 中的每个元素组合起来。

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2)

_ = latestObserver
    .subscribe(onNext: {
    print([=10=])
})
.disposed(by: disposeBag)

产生输出: (4,“床”) (4、《书》) (4, "table")

我原以为会看到 (4, "table") 的输出。

如果我像这样更改可观察对象的顺序:

let latestObserver = Observable.combineLatest(observable2, observable)

我得到输出: ("table", 1) ("table", 2) ("table", 3) ("table", 4)

如果我添加一个最终的任意 Observable,那么我只会看到每个第一个 Observable 的最新版本:

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
let observable3 = Observable<Int>.just(42)

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2, observable3)

_ = latestObserver
    .subscribe(onNext: {
    print([=12=])
})
.disposed(by: disposeBag)

产生输出:(4, "table", 42)

这真的是预期的行为吗?

让我们分解第一个示例中发生的事情...

你可以用Observable.from代替subject,得到同样的结果...代码步骤如下,

  1. 创建两个 ReplaySubject 并加载事件。
  2. combineLatest 运算符订阅了第一个主题。
  3. 第一个主题立即重播其所有值。
  4. 由于尚未订阅第二个主题,combineLatest 运算符不会发出任何内容,而是静默吸收值,同时始终存储“最新”值。
  5. combineLatest 运算符然后订阅第二个主题。
  6. 那个主题重播了所有它的值。
  7. 由于 combineLatest 运算符现在已从其每个源接收到下一个事件,因此它会发出从第二个源发出的值,并结合来自第一个源的最新(即最后一个)值。

您的重放主题本质上是同步的。他们在订阅后立即将所有值发送给订阅者。

在您的最后一个示例代码中,由于三个可观察对象中的最后一个仅发出一个值,并且只有在其他两个已发出所有值之后,您才能看到一个输出包含前两个可观察对象的最新值。