RxJava:仅在给定时间段内出现第一项时才发出

RxJava: Only emit if the first item for a given period

我想以这样一种方式转换我的源 Flowable,即只有当它们是指定时间段内的第一个项目时事件才会通过。

也就是说,我希望第一个项目通过,然后丢弃所有后续项目,直到有一段时间(比如 10 秒)没有上游事件到达。

注意这两者都不是

我现在是这样解决的:

source
  .flatMap { Flowable.just(1).concatWith(Flowable.just(-1).delay(10, TimeUnit.SECONDS)) }
  .scan(0, { x, y -> x + y })
  .map { it > 0 }
  .distinctUntilChanged()
  .filter { it }

注意:我不关心 source 中的实际项目,只关心它们出现 - 但是,当然,我可以将项目与 Pair 一起包装起来15=] 或 -1).

有没有更简单的方法使用内置的 RxJava(2) 运算符来实现相同的目标?

可以使用 switchMap 一次只订阅一个 Flowable 并使用布尔值检查是否必须发出:

class ReduceThrottle<T>(val period: Long, val unit: TimeUnit) : FlowableTransformer<T, T> {
    override fun apply(upstream: Flowable<T>): Publisher<T> {
        return Flowable.defer {
            val doEmit = AtomicBoolean(true)

            upstream.switchMap { item ->
                val ret = if (doEmit.compareAndSet(true, false)) {
                    // We haven't emitted in the last 10 seconds, do the emission
                    Flowable.just(item)
                } else {
                    Flowable.empty()
                }

                ret.concatWith(Completable.timer(period, unit).andThen(Completable.fromAction {
                    // Once the timer successfully expires, reset the state
                    doEmit.set(true)
                }).toFlowable())
            }
        }
    }
}

那么只需应用转换器即可:source.compose(ReduceThrottle(10, TimeUnit.SECONDS))

这可能会满足您的需要

source.debounce(item -> Observable.timer(10,TimeUnit.SECONDS))