RxSwift combineLatest 从最后一个 Observable 发出所有重放的元素
RxSwift combineLatest emits all replayed elements from last Observable
在冷可观察对象集合上使用 combineLatest 时,我看到了一些意想不到的结果。它发出除最后一个 Observable 之外的所有 Observable 中的最新元素,并将第一个 (n-1) 个 Observable 中的最新元素与第 n 个 Observable 中的每个元素组合起来。
let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)
observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")
let latestObserver = Observable.combineLatest(observable, observable2)
_ = latestObserver
.subscribe(onNext: {
print([=10=])
})
.disposed(by: disposeBag)
产生输出:
(4,“床”)
(4、《书》)
(4, "table")
我原以为会看到 (4, "table") 的输出。
如果我像这样更改可观察对象的顺序:
let latestObserver = Observable.combineLatest(observable2, observable)
我得到输出:
("table", 1)
("table", 2)
("table", 3)
("table", 4)
如果我添加一个最终的任意 Observable,那么我只会看到每个第一个 Observable 的最新版本:
let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
let observable3 = Observable<Int>.just(42)
observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)
observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")
let latestObserver = Observable.combineLatest(observable, observable2, observable3)
_ = latestObserver
.subscribe(onNext: {
print([=12=])
})
.disposed(by: disposeBag)
产生输出:(4, "table", 42)
这真的是预期的行为吗?
让我们分解第一个示例中发生的事情...
你可以用Observable.from
代替subject,得到同样的结果...代码步骤如下,
- 创建两个
ReplaySubject
并加载事件。
combineLatest
运算符订阅了第一个主题。
- 第一个主题立即重播其所有值。
- 由于尚未订阅第二个主题,
combineLatest
运算符不会发出任何内容,而是静默吸收值,同时始终存储“最新”值。
combineLatest
运算符然后订阅第二个主题。
- 那个主题重播了所有它的值。
- 由于
combineLatest
运算符现在已从其每个源接收到下一个事件,因此它会发出从第二个源发出的值,并结合来自第一个源的最新(即最后一个)值。
您的重放主题本质上是同步的。他们在订阅后立即将所有值发送给订阅者。
在您的最后一个示例代码中,由于三个可观察对象中的最后一个仅发出一个值,并且只有在其他两个已发出所有值之后,您才能看到一个输出包含前两个可观察对象的最新值。
在冷可观察对象集合上使用 combineLatest 时,我看到了一些意想不到的结果。它发出除最后一个 Observable 之外的所有 Observable 中的最新元素,并将第一个 (n-1) 个 Observable 中的最新元素与第 n 个 Observable 中的每个元素组合起来。
let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)
observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")
let latestObserver = Observable.combineLatest(observable, observable2)
_ = latestObserver
.subscribe(onNext: {
print([=10=])
})
.disposed(by: disposeBag)
产生输出: (4,“床”) (4、《书》) (4, "table")
我原以为会看到 (4, "table") 的输出。
如果我像这样更改可观察对象的顺序:
let latestObserver = Observable.combineLatest(observable2, observable)
我得到输出: ("table", 1) ("table", 2) ("table", 3) ("table", 4)
如果我添加一个最终的任意 Observable,那么我只会看到每个第一个 Observable 的最新版本:
let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
let observable3 = Observable<Int>.just(42)
observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)
observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")
let latestObserver = Observable.combineLatest(observable, observable2, observable3)
_ = latestObserver
.subscribe(onNext: {
print([=12=])
})
.disposed(by: disposeBag)
产生输出:(4, "table", 42)
这真的是预期的行为吗?
让我们分解第一个示例中发生的事情...
你可以用Observable.from
代替subject,得到同样的结果...代码步骤如下,
- 创建两个
ReplaySubject
并加载事件。 combineLatest
运算符订阅了第一个主题。- 第一个主题立即重播其所有值。
- 由于尚未订阅第二个主题,
combineLatest
运算符不会发出任何内容,而是静默吸收值,同时始终存储“最新”值。 combineLatest
运算符然后订阅第二个主题。- 那个主题重播了所有它的值。
- 由于
combineLatest
运算符现在已从其每个源接收到下一个事件,因此它会发出从第二个源发出的值,并结合来自第一个源的最新(即最后一个)值。
您的重放主题本质上是同步的。他们在订阅后立即将所有值发送给订阅者。
在您的最后一个示例代码中,由于三个可观察对象中的最后一个仅发出一个值,并且只有在其他两个已发出所有值之后,您才能看到一个输出包含前两个可观察对象的最新值。