RxJS 限制相同的值但让新值通过
RxJS throttle same value but let new values through
"Here you have",有人说,你得到了这个输入值流,你有点想在...
上执行 distinctUntilChanged()
Input: '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'
到目前为止没有什么奇怪的,
但是现在有人说 "it's okey" 如果相同的值再次出现,"but only if it's not to soon!"。我想要至少 '----'
相同值之间的滴答声。 "Okey"你说你加个油门
const source = new Subject<number>();
// mysterious cave troll is randomly source.next(oneOrTwo)
const example = source.pipe(throttle(val => interval(4000)));
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
"That's not what I want! Look at all the value you missed",指的是您对流式传输的所有值进行了限制。
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
'-------------->1<--------->2<----->1<------|' <-- Missed values
"Here, let me show show you"神秘人说给你这个
想要输出
Input: '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'
我对此的回答是,感觉合并 window 不行。
来自更有经验的人,
这是一个很难解决的问题吗?(或者我错过了一个明显的解决方案)
我没想到,您想按时间间隔进行缓冲,然后在每个缓冲区中区分。
实际上你想每 n 毫秒重新启动/重新启动不同的 运行。
source.pipe(
bufferTime(ms),
mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)
这是我的第二次尝试,它通过输出过滤流(而不是采用 distinctUntil),然后限制并合并两个流。
当然,我们可能没有一组已知的值 (1,2,...n)。
如果我能弄清楚这个问题,我会再举一个例子。
const output = merge(
source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)
这是我的支票 (ms = 4000)
input 1-1----11---2--1112----1---2---2-2-1-2-----
expected 1------1----2--1--2----1---2-----2-1-------
filter(1) 1-1----11------111-----1-----------1-------
throttle(1) 1------1-------1-------1-----------1-------
filter(2) ------------2-----2--------2---2-2---2-----
throttle(2) ------------2-----2--------2-----2---------
merged 1------1----2--1--2----1---2-----2-1-------
expected 1------1----2--1--2----1---2-----2-1-------
扩展到 n 个值
我认为这适用于事先不知道流中的值集的情况(或者范围很大,因此扩展以前的答案是不切实际的)。
只要源代码完成,它就应该可以工作。
merge(
source.pipe(
distinct().pipe(
mapTo(distinctVal => source.pipe(
filter(val = val === distinctVal),
throttle(val => interval(ms))
)
)
)
)
我还没有证据,post 下一个。
首先我想出了以某种方式组合 distinctUntilChanged()
和 throttleTimte()
的想法,但是我无法想出解决方案,然后我尝试了其他方法。
我想出的运算符是 throttleDistinct()
,可以按您的意愿工作:StackBlit Editor Link
它有 2 个参数,它们是:
duration: number
以毫秒为单位,类似于
持续时间 throttleTime(duration: number)
equals: (a: T, b: T) => boolean
比较上一项是否等于下一项的函数,有默认值
(a, b) => a === b
的实施
import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';
const source = fromEvent(document, 'keypress')
.pipe(map((x: any) => x.keyCode as number))
source
.pipe(
throttleDistinct(1000),
)
.subscribe((x) => console.log('__subscribe__', x));
export function throttleDistinct<T>(
duration: number,
equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
return (source: Observable<T>) => {
return source
.pipe(
map((x) => {
const obj = { val: x, time: Date.now(), keep: true };
return obj;
}),
scan((acc, cur) => {
const diff = cur.time - acc.time;
const isSame = equals(acc.val, cur.val)
return diff > duration || (diff < duration && !isSame)
? { ...cur, keep: true }
: { ...acc, keep: false };
}),
filter((x) => x.keep),
map((x) => x.val),
)
}
}
这是一个基于算子理论的棘手解决方案,但我不确定它是否真的有效,因为我需要先模拟源发射。
所以 throttle 和不同的流总是缓存最新的值,zip 确保它们总是成对发射,zip 总是在任何流发射时发射,因为它是 shareReplay(1)。
我们总是从 distinctStream 中获取值,即使 zip 流被 throttle 触发,因为 distinctStream 总是有最后缓存的值。
const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
map((t,d)=>d)
)
我找到了一个有效的解决方案,有人对此有任何看法吗?
source.pipe(
windowTime(4000),
concatMap(obs => obs.pipe(distinct()))
);
之前的示例,在 StackBlitz example
更新: 这实际上并不是 100% 有效。它只考虑当前的 window。所以你可以有
`[1-12][2---]` which would give `1--22---|`
其中 [----]
代表时间 window。换句话说,如果一个值首先在一个 window 中最后发出,并在下一个 window 中首先发出,则相同的值将紧接着彼此传递。
感谢@eric99 让我意识到这一点。
"Here you have",有人说,你得到了这个输入值流,你有点想在...
上执行 distinctUntilChanged()Input: '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'
到目前为止没有什么奇怪的,
但是现在有人说 "it's okey" 如果相同的值再次出现,"but only if it's not to soon!"。我想要至少 '----'
相同值之间的滴答声。 "Okey"你说你加个油门
const source = new Subject<number>();
// mysterious cave troll is randomly source.next(oneOrTwo)
const example = source.pipe(throttle(val => interval(4000)));
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
"That's not what I want! Look at all the value you missed",指的是您对流式传输的所有值进行了限制。
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
'-------------->1<--------->2<----->1<------|' <-- Missed values
"Here, let me show show you"神秘人说给你这个
想要输出
Input: '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'
我对此的回答是,感觉合并 window 不行。
来自更有经验的人,
这是一个很难解决的问题吗?(或者我错过了一个明显的解决方案)
我没想到,您想按时间间隔进行缓冲,然后在每个缓冲区中区分。
实际上你想每 n 毫秒重新启动/重新启动不同的 运行。
source.pipe(
bufferTime(ms),
mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)
这是我的第二次尝试,它通过输出过滤流(而不是采用 distinctUntil),然后限制并合并两个流。
当然,我们可能没有一组已知的值 (1,2,...n)。
如果我能弄清楚这个问题,我会再举一个例子。
const output = merge(
source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)
这是我的支票 (ms = 4000)
input 1-1----11---2--1112----1---2---2-2-1-2-----
expected 1------1----2--1--2----1---2-----2-1-------
filter(1) 1-1----11------111-----1-----------1-------
throttle(1) 1------1-------1-------1-----------1-------
filter(2) ------------2-----2--------2---2-2---2-----
throttle(2) ------------2-----2--------2-----2---------
merged 1------1----2--1--2----1---2-----2-1-------
expected 1------1----2--1--2----1---2-----2-1-------
扩展到 n 个值
我认为这适用于事先不知道流中的值集的情况(或者范围很大,因此扩展以前的答案是不切实际的)。
只要源代码完成,它就应该可以工作。
merge(
source.pipe(
distinct().pipe(
mapTo(distinctVal => source.pipe(
filter(val = val === distinctVal),
throttle(val => interval(ms))
)
)
)
)
我还没有证据,post 下一个。
首先我想出了以某种方式组合 distinctUntilChanged()
和 throttleTimte()
的想法,但是我无法想出解决方案,然后我尝试了其他方法。
我想出的运算符是 throttleDistinct()
,可以按您的意愿工作:StackBlit Editor Link
它有 2 个参数,它们是:
duration: number
以毫秒为单位,类似于 持续时间throttleTime(duration: number)
equals: (a: T, b: T) => boolean
比较上一项是否等于下一项的函数,有默认值(a, b) => a === b
的实施
import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';
const source = fromEvent(document, 'keypress')
.pipe(map((x: any) => x.keyCode as number))
source
.pipe(
throttleDistinct(1000),
)
.subscribe((x) => console.log('__subscribe__', x));
export function throttleDistinct<T>(
duration: number,
equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
return (source: Observable<T>) => {
return source
.pipe(
map((x) => {
const obj = { val: x, time: Date.now(), keep: true };
return obj;
}),
scan((acc, cur) => {
const diff = cur.time - acc.time;
const isSame = equals(acc.val, cur.val)
return diff > duration || (diff < duration && !isSame)
? { ...cur, keep: true }
: { ...acc, keep: false };
}),
filter((x) => x.keep),
map((x) => x.val),
)
}
}
这是一个基于算子理论的棘手解决方案,但我不确定它是否真的有效,因为我需要先模拟源发射。
所以 throttle 和不同的流总是缓存最新的值,zip 确保它们总是成对发射,zip 总是在任何流发射时发射,因为它是 shareReplay(1)。
我们总是从 distinctStream 中获取值,即使 zip 流被 throttle 触发,因为 distinctStream 总是有最后缓存的值。
const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
map((t,d)=>d)
)
我找到了一个有效的解决方案,有人对此有任何看法吗?
source.pipe(
windowTime(4000),
concatMap(obs => obs.pipe(distinct()))
);
之前的示例,在 StackBlitz example
更新: 这实际上并不是 100% 有效。它只考虑当前的 window。所以你可以有
`[1-12][2---]` which would give `1--22---|`
其中 [----]
代表时间 window。换句话说,如果一个值首先在一个 window 中最后发出,并在下一个 window 中首先发出,则相同的值将紧接着彼此传递。
感谢@eric99 让我意识到这一点。