如何在 RxJava 中只获取 zip 的最后一个值?

How to take only the last values for zip in RxJava?

我有两个可观察对象,一个可以非常快地发出事件。我需要同步它们,例如当 observable1 发出时,我从 observable2 中取出最后一个并做一些事情。 我是怎么做到的:

Observable<PairingState> pairingState = CoroutinesHelperKt.toObservable(mManager.getPairingStatusFlow())
            .filter(state -> state == PairingState.Paired);

Observable<...> added = mCallbackManager.getSomeObservable("...")
            .filter(participant -> !participant.participant.getId().equals(mUserManager.getUserId()));

mStateSubscription = Observable.zip(pairingState, added, (state, user) -> true)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .delay(MESSAGE_SEND_DELAY, TimeUnit.MILLISECONDS)
            .subscribe(event -> {
                //do some stuff
            });

问题:如何只取observable2的最后一个?

我正在尝试解决 2 发出 100 个元素时的问题,而我只有一个来自 observable1

我想应该使用.combineLatest。所以它会是:

mStateSubscription = Observable.combineLatest(pairingState, added, (state, user) -> true)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .delay(MESSAGE_SEND_DELAY, TimeUnit.MILLISECONDS)
            .subscribe(event -> {
                //do some stuff
            });

虽然 combineLatest 可能是您所需要的,但您还应该查看 withLatestFrom

  • combineLatest:当 observable 1 发出时,使用 observable 2 的最新值。当 observable 2 发出时,使用 observable 1 的最新值。
  • withLatestFrom:当 observable 1 发出时,使用 observable 2 的最新值。当 observable 2 发出时,仅更新内部状态。