Rx - 如何应用于允许在一段时间后发出重复项的流 distinctUntilChanged?

Rx - How to apply to stream distinctUntilChanged that allows emit duplicate item after some period?

因此,我必须为 Observable 实现一些链或一些自定义 RxJava 运算符,这将区分从 Observable 发出的项目,直到它们发生变化,但只是短暂的周期(如 1 秒),然后可以再次发出重复的项目。

我需要的是 distinctUntilChangedthrottle?

的一些嵌套组合

主要要求是:

我找不到任何符合我要求的运算符,所以我可能需要编写一些自定义 Rx 运算符,但我仍然不知道如何开始

您只需使用 groupBy 运算符即可完成此操作。由于每个 group$ 都通过 throttleTime 进行管道传输,并且每个发射都经过此 group$ Observable 它将忽略所有后续发射 1 秒:

source$
  .pipe(
    groupBy(item => item.whatever),
    mergeMap(group$ => group$.pipe(
      throttleTime(1000)
    )),
  )
  .subscribe(...);

所以,我想通了,原来很容易实现:

fun <T> Observable<T>.distinctUntil(time: Long): Observable<T> {
    return this
        .timestamp()
        .distinctUntilChanged { old, new ->
            new.value() == old.value() && new.time() - old.time() < time
        }
        .map { it.value() }
}