RxJS 中第一个值后的 DebounceTime

DebounceTime after first value in RxJS

我需要 RxJS 运算符无法实现的特定行为。最接近的是只对第一个之后输入的值使用 DebounceTime,但我找不到办法做到这一点。我也尝试过使用 ThrottleTime,但它并不是我想要的,因为它启动了中间调用,我只想要一个在开始时是瞬时的,在结束时另一个,没有别的。

油门时间

throttleTime(12 ticks, { leading: true, trailing: true })

source:             --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output:             --0-----------3-----------6-----------7-----------9--------


source_2:           --0--------1------------------2--------------3---4---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2:           --0-----------1---------------2--------------3-----------4-

去抖时间

debounceTime(500)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             -----------1--------3--------------------------------13---------

我想要什么

debounceTimeAfterFirst(500) (?)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             --0--------1--3------------4-------------------------13---------

如您所见,输入新值时会激活去抖时间。如果去抖动时间过去并且输入了任何新值,它会停止侦听 debounceTime 动作并等待开始新的动作。

Edit: 忘记评论了,这个一定要和NgRx的Effects整合,所以一定是一个连续流,必须'完成。终止它可能会导致它停止侦听已调度的操作。

我认为你可以像下面那样做,尽管我现在想不出任何更简单的解决方案(我假设你正在使用带有 connect() 运算符的 RxJS 7+):

connect(shared$ => shared$.pipe(
  exhaustMap(value => merge(
    of(value),
    shared$.pipe(debounceTime(1000)),
  ).pipe(
    take(2),
  )),
)),

现场演示:https://stackblitz.com/edit/rxjs-qwoesj?devtoolsheight=60&file=index.ts

connect() 将共享源 Observable 并允许您在其项目函数中多次重用它。我使用它只是为了在另一个链中使用源 Observable。

exhaustMap() 将忽略所有 next 通知,直到其内部 Observable 完成。在这种情况下,内部 Observable 将立即重新发出当前值 (of(value)),然后使用 debounceTime()exhaustMap() 忽略来自源的任何后续发射,因为内部 Observable 尚未完成但也被传递给 debounceTime()。然后 take(2) 用于在 debounceTime() 发出后完成链,并且整个过程可以在源发出时重复,因为 exhaustMap() 不会忽略 next 通知(它的内部 Observable 有完成)。

这是一个自定义运算符(据我所知)它可以满足您的需求。

这里的两个关键见解是:

  1. 使用 connect 以便您可以订阅源两次,一次使用 exhaustMap 忽略发射,另一次使用 switchMap
  2. 检查和去抖发射
  3. 创建一个内部令牌,以便您知道何时在没有去抖发射的情况下退出。 (确保从上面的示例中,仍然发出 4)。
function throttleDebounceTime<T>(interval: number): MonoTypeOperatorFunction<T> {
  // Use this token's memory address as a nominal token
  const resetToken = {};

  return connect(s$ => s$.pipe(
    exhaustMap(a => s$.pipe(
      startWith(resetToken),
      switchMap(b => timer(interval).pipe(mapTo(b))),
      take(1),
      filter<T>(c => c !== resetToken),
      startWith(a)
    ))
  ));
}

示例:

of(1,2,3,4).pipe(
  throttleDebounceTime(500)
).subscribe(console.log);

// 1 [...0.5s wait] 4

我会使用 throttle 结合 debounceTime:

  • throttle: from Documentation 在间隔的前缘发出值,但抑制新值,直到 durationSelector 完成。

  • debounceTime: from Documentation 丢弃在输出之间花费少于指定时间的发射值.

我会使用节流流来获得上升沿(第一次发射),然后去抖动流会给我们下降沿。

const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
  pluck('target', 'value')
);

const debounced = source.pipe(
  debounceTime(4000),
  map((v) => `[d] ${v}`)
);

const effect = merge(
  source.pipe(
    throttle((val) => debounced),
    map((v) => `[t] ${v}`)
  ),
  debounced
);

effect.subscribe(console.log);

在打开控制台的情况下查看 RxJS StackBlitz 以查看值的变化。

我准备了设置以适应您提到的 NgRx。我得到的效果是:

@Injectable({ providedIn: 'root' })
export class FooEffects {
  switchLight$ = createEffect(() => {
    const source = this.actions$.pipe(
      ofType('[App] Switch Light'),
      pluck('onOrOff'),
      share()
    );
    const debounced = source.pipe(debounceTime(1000), share());
    return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
      map((onOrOff) => SetLightStatus({ onOrOff }))
    );
  });

  constructor(private actions$: Actions) {}
}

请参阅 NgRx StackBlitz,其中建议的解决方案在 Angular NgRx 应用程序的上下文中工作。

  • share:这个运算符阻止下游路径同时从链上一直获取数据,而是从你放置共享的点获取数据。

我也尝试采用@martin 的connect() 方法。但我不知道@martin 将如何“重置”系统,以便在很长一段时间后,如果发出一个新的源值,它不会像你第一次 运行 那样去抖动它,@martin,随意分叉并调整它以使其工作,我很好奇你的方法,这是非常聪明的。我不知道 connect()

@avicarpio 在您的申请中试一试,让我们知道进展情况:)