RxJS 中第一个值后的 DebounceTime
DebounceTime after first value in RxJS
我需要 RxJS 运算符无法实现的特定行为。最接近的是只对第一个之后输入的值使用 DebounceTime,但我找不到办法做到这一点。我也尝试过使用 ThrottleTime,但它并不是我想要的,因为它启动了中间调用,我只想要一个在开始时是瞬时的,在结束时另一个,没有别的。
油门时间
throttleTime(12 ticks, { leading: true, trailing: true })
source: --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval: --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output: --0-----------3-----------6-----------7-----------9--------
source_2: --0--------1------------------2--------------3---4---------
throttle interval: --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2: --0-----------1---------------2--------------3-----------4-
去抖时间
debounceTime(500)
source: --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval: -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output: -----------1--------3--------------------------------13---------
我想要什么
debounceTimeAfterFirst(500) (?)
source: --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval: -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output: --0--------1--3------------4-------------------------13---------
如您所见,输入新值时会激活去抖时间。如果去抖动时间过去并且输入了任何新值,它会停止侦听 debounceTime 动作并等待开始新的动作。
Edit: 忘记评论了,这个一定要和NgRx的Effects整合,所以一定是一个连续流,必须'完成。终止它可能会导致它停止侦听已调度的操作。
我认为你可以像下面那样做,尽管我现在想不出任何更简单的解决方案(我假设你正在使用带有 connect()
运算符的 RxJS 7+):
connect(shared$ => shared$.pipe(
exhaustMap(value => merge(
of(value),
shared$.pipe(debounceTime(1000)),
).pipe(
take(2),
)),
)),
现场演示:https://stackblitz.com/edit/rxjs-qwoesj?devtoolsheight=60&file=index.ts
connect()
将共享源 Observable 并允许您在其项目函数中多次重用它。我使用它只是为了在另一个链中使用源 Observable。
exhaustMap()
将忽略所有 next
通知,直到其内部 Observable 完成。在这种情况下,内部 Observable 将立即重新发出当前值 (of(value)
),然后使用 debounceTime()
。 exhaustMap()
忽略来自源的任何后续发射,因为内部 Observable 尚未完成但也被传递给 debounceTime()
。然后 take(2)
用于在 debounceTime()
发出后完成链,并且整个过程可以在源发出时重复,因为 exhaustMap()
不会忽略 next
通知(它的内部 Observable 有完成)。
这是一个自定义运算符(据我所知)它可以满足您的需求。
这里的两个关键见解是:
- 使用 connect 以便您可以订阅源两次,一次使用
exhaustMap
忽略发射,另一次使用 switchMap
检查和去抖发射
- 创建一个内部令牌,以便您知道何时在没有去抖发射的情况下退出。 (确保从上面的示例中,仍然发出 4)。
function throttleDebounceTime<T>(interval: number): MonoTypeOperatorFunction<T> {
// Use this token's memory address as a nominal token
const resetToken = {};
return connect(s$ => s$.pipe(
exhaustMap(a => s$.pipe(
startWith(resetToken),
switchMap(b => timer(interval).pipe(mapTo(b))),
take(1),
filter<T>(c => c !== resetToken),
startWith(a)
))
));
}
示例:
of(1,2,3,4).pipe(
throttleDebounceTime(500)
).subscribe(console.log);
// 1 [...0.5s wait] 4
我会使用 throttle
结合 debounceTime
:
throttle: from Documentation 在间隔的前缘发出值,但抑制新值,直到 durationSelector 完成。
debounceTime: from Documentation 丢弃在输出之间花费少于指定时间的发射值.
我会使用节流流来获得上升沿(第一次发射),然后去抖动流会给我们下降沿。
const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
pluck('target', 'value')
);
const debounced = source.pipe(
debounceTime(4000),
map((v) => `[d] ${v}`)
);
const effect = merge(
source.pipe(
throttle((val) => debounced),
map((v) => `[t] ${v}`)
),
debounced
);
effect.subscribe(console.log);
在打开控制台的情况下查看 RxJS StackBlitz 以查看值的变化。
我准备了设置以适应您提到的 NgRx。我得到的效果是:
@Injectable({ providedIn: 'root' })
export class FooEffects {
switchLight$ = createEffect(() => {
const source = this.actions$.pipe(
ofType('[App] Switch Light'),
pluck('onOrOff'),
share()
);
const debounced = source.pipe(debounceTime(1000), share());
return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
map((onOrOff) => SetLightStatus({ onOrOff }))
);
});
constructor(private actions$: Actions) {}
}
请参阅 NgRx StackBlitz,其中建议的解决方案在 Angular NgRx 应用程序的上下文中工作。
- share:这个运算符阻止下游路径同时从链上一直获取数据,而是从你放置共享的点获取数据。
我也尝试采用@martin 的connect()
方法。但我不知道@martin 将如何“重置”系统,以便在很长一段时间后,如果发出一个新的源值,它不会像你第一次 运行 那样去抖动它,@martin,随意分叉并调整它以使其工作,我很好奇你的方法,这是非常聪明的。我不知道 connect()
。
@avicarpio 在您的申请中试一试,让我们知道进展情况:)
我需要 RxJS 运算符无法实现的特定行为。最接近的是只对第一个之后输入的值使用 DebounceTime,但我找不到办法做到这一点。我也尝试过使用 ThrottleTime,但它并不是我想要的,因为它启动了中间调用,我只想要一个在开始时是瞬时的,在结束时另一个,没有别的。
油门时间
throttleTime(12 ticks, { leading: true, trailing: true })
source: --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval: --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output: --0-----------3-----------6-----------7-----------9--------
source_2: --0--------1------------------2--------------3---4---------
throttle interval: --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2: --0-----------1---------------2--------------3-----------4-
去抖时间
debounceTime(500)
source: --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval: -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output: -----------1--------3--------------------------------13---------
我想要什么
debounceTimeAfterFirst(500) (?)
source: --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval: -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output: --0--------1--3------------4-------------------------13---------
如您所见,输入新值时会激活去抖时间。如果去抖动时间过去并且输入了任何新值,它会停止侦听 debounceTime 动作并等待开始新的动作。
Edit: 忘记评论了,这个一定要和NgRx的Effects整合,所以一定是一个连续流,必须'完成。终止它可能会导致它停止侦听已调度的操作。
我认为你可以像下面那样做,尽管我现在想不出任何更简单的解决方案(我假设你正在使用带有 connect()
运算符的 RxJS 7+):
connect(shared$ => shared$.pipe(
exhaustMap(value => merge(
of(value),
shared$.pipe(debounceTime(1000)),
).pipe(
take(2),
)),
)),
现场演示:https://stackblitz.com/edit/rxjs-qwoesj?devtoolsheight=60&file=index.ts
connect()
将共享源 Observable 并允许您在其项目函数中多次重用它。我使用它只是为了在另一个链中使用源 Observable。
exhaustMap()
将忽略所有 next
通知,直到其内部 Observable 完成。在这种情况下,内部 Observable 将立即重新发出当前值 (of(value)
),然后使用 debounceTime()
。 exhaustMap()
忽略来自源的任何后续发射,因为内部 Observable 尚未完成但也被传递给 debounceTime()
。然后 take(2)
用于在 debounceTime()
发出后完成链,并且整个过程可以在源发出时重复,因为 exhaustMap()
不会忽略 next
通知(它的内部 Observable 有完成)。
这是一个自定义运算符(据我所知)它可以满足您的需求。
这里的两个关键见解是:
- 使用 connect 以便您可以订阅源两次,一次使用
exhaustMap
忽略发射,另一次使用switchMap
检查和去抖发射
- 创建一个内部令牌,以便您知道何时在没有去抖发射的情况下退出。 (确保从上面的示例中,仍然发出 4)。
function throttleDebounceTime<T>(interval: number): MonoTypeOperatorFunction<T> {
// Use this token's memory address as a nominal token
const resetToken = {};
return connect(s$ => s$.pipe(
exhaustMap(a => s$.pipe(
startWith(resetToken),
switchMap(b => timer(interval).pipe(mapTo(b))),
take(1),
filter<T>(c => c !== resetToken),
startWith(a)
))
));
}
示例:
of(1,2,3,4).pipe(
throttleDebounceTime(500)
).subscribe(console.log);
// 1 [...0.5s wait] 4
我会使用 throttle
结合 debounceTime
:
throttle: from Documentation 在间隔的前缘发出值,但抑制新值,直到 durationSelector 完成。
debounceTime: from Documentation 丢弃在输出之间花费少于指定时间的发射值.
我会使用节流流来获得上升沿(第一次发射),然后去抖动流会给我们下降沿。
const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
pluck('target', 'value')
);
const debounced = source.pipe(
debounceTime(4000),
map((v) => `[d] ${v}`)
);
const effect = merge(
source.pipe(
throttle((val) => debounced),
map((v) => `[t] ${v}`)
),
debounced
);
effect.subscribe(console.log);
在打开控制台的情况下查看 RxJS StackBlitz 以查看值的变化。
我准备了设置以适应您提到的 NgRx。我得到的效果是:
@Injectable({ providedIn: 'root' })
export class FooEffects {
switchLight$ = createEffect(() => {
const source = this.actions$.pipe(
ofType('[App] Switch Light'),
pluck('onOrOff'),
share()
);
const debounced = source.pipe(debounceTime(1000), share());
return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
map((onOrOff) => SetLightStatus({ onOrOff }))
);
});
constructor(private actions$: Actions) {}
}
请参阅 NgRx StackBlitz,其中建议的解决方案在 Angular NgRx 应用程序的上下文中工作。
- share:这个运算符阻止下游路径同时从链上一直获取数据,而是从你放置共享的点获取数据。
我也尝试采用@martin 的connect()
方法。但我不知道@martin 将如何“重置”系统,以便在很长一段时间后,如果发出一个新的源值,它不会像你第一次 运行 那样去抖动它,@martin,随意分叉并调整它以使其工作,我很好奇你的方法,这是非常聪明的。我不知道 connect()
。
@avicarpio 在您的申请中试一试,让我们知道进展情况:)