如何使用 RxJava2 创建谓词

How to create a predicate using RxJava2

我正在尝试学习 Rxjava,但仍处于非常基础的水平。我正在寻找有关以下问题的一些指导。

我有这两个 Observable,即

    fun getPlayheadPositionInMilliseconds(): Observable<Long> {
        return ConnectableObservable.interval(1000, TimeUnit.MILLISECONDS)
            .map { exoPlayer.currentPosition }
            .publish()
            .autoConnect()
    }

    fun getContentDurationInMilliseconds(): Observable<Long> {
        return ConnectableObservable.just(exoPlayer.duration)
            .publish()
            .autoConnect()
    }

现在我正尝试从这些创建谓词。我的要求是当 getPlayheadPositionInMilliseconds 达到 getContentDurationInMilliseconds 的 70% 时我想发出一个布尔信号。

如您所见,从 getPlayheadPositionInMilliseconds 开始,订阅者将在每 1000 毫秒后获得新值,现在我想将该值与我从 getContentDurationInMilliseconds 获得的总持续时间进行比较。当 getPlayheadPositionInMilliseconds 值达到 getContentDurationInMilliseconds 的 70% 时,将引发布尔信号。

我知道如何在不使用 RxJava 的情况下做到这一点,但我正在寻找一种在 RxJava 中做到这一点的方法。如果需要更多信息,请告诉我。

如果我理解正确,您希望 Observable<Boolean> 每次发出一个子可观察量时发出,并且布尔值根据您的谓词为真。这可以通过以下方式实现:

    // Emits items when either child observable emits
    fun isPlayheadPosition70PercentOfContentDuration(): Observable<Boolean> =
        Observables                                         // Helper class from RxKotlin package
            .combineLatest(                                 // Emit a Pair<Long, Long> of values from the
                getPlayheadPositionInMilliseconds(),        // latest of the child observables
                getContentDurationInMilliseconds()
            )
            .map { (playheadPosition, contentDuration) ->   // Transform the item based on this function
                playheadPosition >= 0.7 * contentDuration   // (in this case, into a Boolean based on a predicate)
            }

其中 Observables 是一个助手 class,它具有来自 RxKotlin 的有用方法,这是在 Kotlin 中编写 Rx 时的必备工具。

我想也许您可能还想在满足谓词时立即触发一个事件。在这种情况下,您可以将上面的内容转换为 Completable,当您的谓词首次变为真时触发:

    // If you subscribe to this, the onComplete will signal once the predicate is satisfied
    fun playheadPositionHasReached70PercentContentDuration(): Completable =
        Observables
            .combineLatest(getPlayheadPositionInMilliseconds(), getContentDurationInMilliseconds())

            // Filter: Only allow elements through the stream if they satisfy this predicate
            .filter { (playheadPosition, contentDuration) -> playheadPosition >= 0.7 * contentDuration }

            // Take only the first element (i.e. turn this into a Single)
            .firstOrError()

            // Actually, I don't care about the element, I just want to know when it happens
            // (i.e. turn this into a Completable)
            .ignoreElement()