如何在 RxJava 中只获取 zip 的最后一个值?
How to take only the last values for zip in RxJava?
我有两个可观察对象,一个可以非常快地发出事件。我需要同步它们,例如当 observable1 发出时,我从 observable2 中取出最后一个并做一些事情。
我是怎么做到的:
Observable<PairingState> pairingState = CoroutinesHelperKt.toObservable(mManager.getPairingStatusFlow())
.filter(state -> state == PairingState.Paired);
Observable<...> added = mCallbackManager.getSomeObservable("...")
.filter(participant -> !participant.participant.getId().equals(mUserManager.getUserId()));
mStateSubscription = Observable.zip(pairingState, added, (state, user) -> true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.delay(MESSAGE_SEND_DELAY, TimeUnit.MILLISECONDS)
.subscribe(event -> {
//do some stuff
});
问题:如何只取observable2的最后一个?
我正在尝试解决 2 发出 100 个元素时的问题,而我只有一个来自 observable1
我想应该使用.combineLatest。所以它会是:
mStateSubscription = Observable.combineLatest(pairingState, added, (state, user) -> true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.delay(MESSAGE_SEND_DELAY, TimeUnit.MILLISECONDS)
.subscribe(event -> {
//do some stuff
});
虽然 combineLatest
可能是您所需要的,但您还应该查看 withLatestFrom
。
combineLatest
:当 observable 1 发出时,使用 observable 2 的最新值。当 observable 2 发出时,使用 observable 1 的最新值。
withLatestFrom
:当 observable 1 发出时,使用 observable 2 的最新值。当 observable 2 发出时,仅更新内部状态。
我有两个可观察对象,一个可以非常快地发出事件。我需要同步它们,例如当 observable1 发出时,我从 observable2 中取出最后一个并做一些事情。 我是怎么做到的:
Observable<PairingState> pairingState = CoroutinesHelperKt.toObservable(mManager.getPairingStatusFlow())
.filter(state -> state == PairingState.Paired);
Observable<...> added = mCallbackManager.getSomeObservable("...")
.filter(participant -> !participant.participant.getId().equals(mUserManager.getUserId()));
mStateSubscription = Observable.zip(pairingState, added, (state, user) -> true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.delay(MESSAGE_SEND_DELAY, TimeUnit.MILLISECONDS)
.subscribe(event -> {
//do some stuff
});
问题:如何只取observable2的最后一个?
我正在尝试解决 2 发出 100 个元素时的问题,而我只有一个来自 observable1
我想应该使用.combineLatest。所以它会是:
mStateSubscription = Observable.combineLatest(pairingState, added, (state, user) -> true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.delay(MESSAGE_SEND_DELAY, TimeUnit.MILLISECONDS)
.subscribe(event -> {
//do some stuff
});
虽然 combineLatest
可能是您所需要的,但您还应该查看 withLatestFrom
。
combineLatest
:当 observable 1 发出时,使用 observable 2 的最新值。当 observable 2 发出时,使用 observable 1 的最新值。withLatestFrom
:当 observable 1 发出时,使用 observable 2 的最新值。当 observable 2 发出时,仅更新内部状态。