Rx - 如何应用于允许在一段时间后发出重复项的流 distinctUntilChanged?
Rx - How to apply to stream distinctUntilChanged that allows emit duplicate item after some period?
因此,我必须为 Observable 实现一些链或一些自定义 RxJava 运算符,这将区分从 Observable 发出的项目,直到它们发生变化,但只是短暂的周期(如 1 秒),然后可以再次发出重复的项目。
我需要的是 distinctUntilChanged 和 throttle?
的一些嵌套组合
主要要求是:
- 必须立即发射不同的物品
- 同一物品不能在给定时间内发射两次
我找不到任何符合我要求的运算符,所以我可能需要编写一些自定义 Rx 运算符,但我仍然不知道如何开始
您只需使用 groupBy
运算符即可完成此操作。由于每个 group$
都通过 throttleTime
进行管道传输,并且每个发射都经过此 group$
Observable 它将忽略所有后续发射 1 秒:
source$
.pipe(
groupBy(item => item.whatever),
mergeMap(group$ => group$.pipe(
throttleTime(1000)
)),
)
.subscribe(...);
所以,我想通了,原来很容易实现:
fun <T> Observable<T>.distinctUntil(time: Long): Observable<T> {
return this
.timestamp()
.distinctUntilChanged { old, new ->
new.value() == old.value() && new.time() - old.time() < time
}
.map { it.value() }
}
因此,我必须为 Observable 实现一些链或一些自定义 RxJava 运算符,这将区分从 Observable 发出的项目,直到它们发生变化,但只是短暂的周期(如 1 秒),然后可以再次发出重复的项目。
我需要的是 distinctUntilChanged 和 throttle?
的一些嵌套组合主要要求是:
- 必须立即发射不同的物品
- 同一物品不能在给定时间内发射两次
我找不到任何符合我要求的运算符,所以我可能需要编写一些自定义 Rx 运算符,但我仍然不知道如何开始
您只需使用 groupBy
运算符即可完成此操作。由于每个 group$
都通过 throttleTime
进行管道传输,并且每个发射都经过此 group$
Observable 它将忽略所有后续发射 1 秒:
source$
.pipe(
groupBy(item => item.whatever),
mergeMap(group$ => group$.pipe(
throttleTime(1000)
)),
)
.subscribe(...);
所以,我想通了,原来很容易实现:
fun <T> Observable<T>.distinctUntil(time: Long): Observable<T> {
return this
.timestamp()
.distinctUntilChanged { old, new ->
new.value() == old.value() && new.time() - old.time() < time
}
.map { it.value() }
}