RxJava:仅在给定时间段内出现第一项时才发出
RxJava: Only emit if the first item for a given period
我想以这样一种方式转换我的源 Flowable,即只有当它们是指定时间段内的第一个项目时事件才会通过。
也就是说,我希望第一个项目通过,然后丢弃所有后续项目,直到有一段时间(比如 10 秒)没有上游事件到达。
注意这两者都不是
debounce
:这将发出每个项目,前提是它没有跟随另一个项目 10 秒 - 但这将强制甚至第一个项目延迟 10 秒。我想立即发出第一个项目。
throttleFirst
:这将发出第一个项目,然后在第一个项目之后丢弃所有后续项目 10 秒。我想在每个上游项目之后重置阻塞期。
我现在是这样解决的:
source
.flatMap { Flowable.just(1).concatWith(Flowable.just(-1).delay(10, TimeUnit.SECONDS)) }
.scan(0, { x, y -> x + y })
.map { it > 0 }
.distinctUntilChanged()
.filter { it }
注意:我不关心 source
中的实际项目,只关心它们出现 - 但是,当然,我可以将项目与 Pair
一起包装起来15=] 或 -1
).
有没有更简单的方法使用内置的 RxJava(2) 运算符来实现相同的目标?
可以使用 switchMap
一次只订阅一个 Flowable
并使用布尔值检查是否必须发出:
class ReduceThrottle<T>(val period: Long, val unit: TimeUnit) : FlowableTransformer<T, T> {
override fun apply(upstream: Flowable<T>): Publisher<T> {
return Flowable.defer {
val doEmit = AtomicBoolean(true)
upstream.switchMap { item ->
val ret = if (doEmit.compareAndSet(true, false)) {
// We haven't emitted in the last 10 seconds, do the emission
Flowable.just(item)
} else {
Flowable.empty()
}
ret.concatWith(Completable.timer(period, unit).andThen(Completable.fromAction {
// Once the timer successfully expires, reset the state
doEmit.set(true)
}).toFlowable())
}
}
}
}
那么只需应用转换器即可:source.compose(ReduceThrottle(10, TimeUnit.SECONDS))
。
这可能会满足您的需要
source.debounce(item -> Observable.timer(10,TimeUnit.SECONDS))
我想以这样一种方式转换我的源 Flowable,即只有当它们是指定时间段内的第一个项目时事件才会通过。
也就是说,我希望第一个项目通过,然后丢弃所有后续项目,直到有一段时间(比如 10 秒)没有上游事件到达。
注意这两者都不是
debounce
:这将发出每个项目,前提是它没有跟随另一个项目 10 秒 - 但这将强制甚至第一个项目延迟 10 秒。我想立即发出第一个项目。throttleFirst
:这将发出第一个项目,然后在第一个项目之后丢弃所有后续项目 10 秒。我想在每个上游项目之后重置阻塞期。
我现在是这样解决的:
source
.flatMap { Flowable.just(1).concatWith(Flowable.just(-1).delay(10, TimeUnit.SECONDS)) }
.scan(0, { x, y -> x + y })
.map { it > 0 }
.distinctUntilChanged()
.filter { it }
注意:我不关心 source
中的实际项目,只关心它们出现 - 但是,当然,我可以将项目与 Pair
一起包装起来15=] 或 -1
).
有没有更简单的方法使用内置的 RxJava(2) 运算符来实现相同的目标?
可以使用 switchMap
一次只订阅一个 Flowable
并使用布尔值检查是否必须发出:
class ReduceThrottle<T>(val period: Long, val unit: TimeUnit) : FlowableTransformer<T, T> {
override fun apply(upstream: Flowable<T>): Publisher<T> {
return Flowable.defer {
val doEmit = AtomicBoolean(true)
upstream.switchMap { item ->
val ret = if (doEmit.compareAndSet(true, false)) {
// We haven't emitted in the last 10 seconds, do the emission
Flowable.just(item)
} else {
Flowable.empty()
}
ret.concatWith(Completable.timer(period, unit).andThen(Completable.fromAction {
// Once the timer successfully expires, reset the state
doEmit.set(true)
}).toFlowable())
}
}
}
}
那么只需应用转换器即可:source.compose(ReduceThrottle(10, TimeUnit.SECONDS))
。
这可能会满足您的需要
source.debounce(item -> Observable.timer(10,TimeUnit.SECONDS))