每 1000 毫秒间隔获取一个事件
Get one event per 1000 milliseconds interval
我有无限的事件流,可以发出一些连续的事件部分,我想每毫秒每 1000 次接收一个事件。
我尝试了 debounceTime
/ auditTime
/ throttleTime
但它们不包括我想要的所有事件 - 以演示我创建的 playground on stackblitz 触发事件一次的行为在 10 个事件的一部分内每 300 毫秒:
debounceTime(1000)
只会给出事件 #10
throttleTime(1000)
将给出事件 1,5,9
但它会省略 #10
这是必不可少的
auditTime(1000)
将给予事件 4,8
我在这里想要的是获取事件 1,5,9,10
(每 1000 毫秒间隔一个事件)。我该如何实现?
const events$ = interval(300).pipe(
map(num => `Firing event ${num + 1}`)
);
const source = events$.pipe(
tap(console.log),
take(10),
// make some debouncing??
map(x => `Received ${x}!`)
);
source.subscribe(x =>
console.log(
"%c" + x,
"background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
)
);
我还尝试使用 zip
/ combineLatest
并通过间隔 but no luck with that
发出值
这是一种方法:
const events$ = interval(300).pipe(
map(num => `Firing event ${num + 1}`),
share(),
);
const source$ = events$.pipe(
take(10),
);
const lastValue$ = source$.pipe(last());
merge(source$.pipe(throttleTime(1000)), lastValue$).subscribe(console.log);
share()
将确保源(interval
产生的可观察对象)仅订阅一次
last()
将 return 最新的 nexted 值,当它的源(source$
在这种情况下)由于 take(10)
编辑
根据我对问题的理解,这将是另一种选择:
const events$ = merge(
timer(300), // 1
timer(600), // 2
timer(900), // 3
timer(1301), // 4
timer(1500), // 5
timer(1800), // 6
timer(2302), // 7
).pipe(
map((_, num) => `Firing event ${num + 1}`),
share(),
);
const lastValue$ = events$.pipe(debounceTime(1000));
const source$ = events$.pipe(
throttleTime(1000),
buffer(lastValue$)
)
.subscribe(console.log)
debounceTime(1000)
- 如果 1s 过去了而没有任何通知
buffer(lastValue$)
- 当lastValue$
发出时,发送收集到的值
已更新
基于评论中的讨论
const events$ = timer(0, 6000).pipe(
take(3),
switchMap(x =>
timer(0, 300).pipe(
map(num => `event #${num + 1}`),
take(x > 1 ? 9 : 10)
)
)
);
const source = merge(
events$.pipe(
tap(e => console.log(`%cStream: ${e}`, "color: blue;")),
debounceTime(1000),
tap(x => console.log(`%cdebounceTime captured: ${x}`, "color: red;"))
),
events$.pipe(
throttleTime(1000),
tap(x => console.log(`%cthrottleTime captured: ${x}`, "color: green;"))
),
).pipe(
// we need to avoid duplicates (like case with 9).
// if all events aren't unique you need to use the original solution below.
distinctUntilChanged(), // <-- if all events are unique.
map(x => `Received ${x}!`)
);
source.subscribe(x =>
console.log(
"%c" + x,
"background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
)
);
原创
我希望这就是你想要的:https://take.ms/VP7tA
const events$ = interval(300).pipe(
map(num => `Firing event ${num + 1}`)
);
const source = concat(events$.pipe(
tap(console.log),
take(10),
), timer(1000).pipe(switchMapTo(EMPTY)), events$.pipe(
tap(console.log),
take(10),
));
let lastTimer = 0;
const last$ = new Subject<number>();
merge(
source.pipe(
scan((state, event) => {
state[1] = null;
const stamp = new Date().getTime();
clearTimeout(lastTimer);
if (stamp - state[0] < 1000) {
lastTimer = setTimeout(() => last$.next(event), (stamp - state[0]) + 50);
return state;
}
state[0] = stamp;
state[1] = event;
return state;
}, [0, null]),
filter(([, event]) => event !== null),
map(([, event]) => event || 0),
),
last$,
).pipe(
map(x => `Received ${JSON.stringify(x)}!`)
).subscribe(x =>
console.log(
"%c" + x,
"background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
)
);
我有无限的事件流,可以发出一些连续的事件部分,我想每毫秒每 1000 次接收一个事件。
我尝试了 debounceTime
/ auditTime
/ throttleTime
但它们不包括我想要的所有事件 - 以演示我创建的 playground on stackblitz 触发事件一次的行为在 10 个事件的一部分内每 300 毫秒:
debounceTime(1000)
只会给出事件 #10
throttleTime(1000)
将给出事件1,5,9
但它会省略 #10
这是必不可少的auditTime(1000)
将给予事件4,8
我在这里想要的是获取事件 1,5,9,10
(每 1000 毫秒间隔一个事件)。我该如何实现?
const events$ = interval(300).pipe(
map(num => `Firing event ${num + 1}`)
);
const source = events$.pipe(
tap(console.log),
take(10),
// make some debouncing??
map(x => `Received ${x}!`)
);
source.subscribe(x =>
console.log(
"%c" + x,
"background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
)
);
我还尝试使用 zip
/ combineLatest
并通过间隔 but no luck with that
这是一种方法:
const events$ = interval(300).pipe(
map(num => `Firing event ${num + 1}`),
share(),
);
const source$ = events$.pipe(
take(10),
);
const lastValue$ = source$.pipe(last());
merge(source$.pipe(throttleTime(1000)), lastValue$).subscribe(console.log);
share()
将确保源(interval
产生的可观察对象)仅订阅一次
last()
将 return 最新的 nexted 值,当它的源(source$
在这种情况下)由于 take(10)
编辑
根据我对问题的理解,这将是另一种选择:
const events$ = merge(
timer(300), // 1
timer(600), // 2
timer(900), // 3
timer(1301), // 4
timer(1500), // 5
timer(1800), // 6
timer(2302), // 7
).pipe(
map((_, num) => `Firing event ${num + 1}`),
share(),
);
const lastValue$ = events$.pipe(debounceTime(1000));
const source$ = events$.pipe(
throttleTime(1000),
buffer(lastValue$)
)
.subscribe(console.log)
debounceTime(1000)
- 如果 1s 过去了而没有任何通知buffer(lastValue$)
- 当lastValue$
发出时,发送收集到的值
已更新
基于评论中的讨论
const events$ = timer(0, 6000).pipe(
take(3),
switchMap(x =>
timer(0, 300).pipe(
map(num => `event #${num + 1}`),
take(x > 1 ? 9 : 10)
)
)
);
const source = merge(
events$.pipe(
tap(e => console.log(`%cStream: ${e}`, "color: blue;")),
debounceTime(1000),
tap(x => console.log(`%cdebounceTime captured: ${x}`, "color: red;"))
),
events$.pipe(
throttleTime(1000),
tap(x => console.log(`%cthrottleTime captured: ${x}`, "color: green;"))
),
).pipe(
// we need to avoid duplicates (like case with 9).
// if all events aren't unique you need to use the original solution below.
distinctUntilChanged(), // <-- if all events are unique.
map(x => `Received ${x}!`)
);
source.subscribe(x =>
console.log(
"%c" + x,
"background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
)
);
原创
我希望这就是你想要的:https://take.ms/VP7tA
const events$ = interval(300).pipe(
map(num => `Firing event ${num + 1}`)
);
const source = concat(events$.pipe(
tap(console.log),
take(10),
), timer(1000).pipe(switchMapTo(EMPTY)), events$.pipe(
tap(console.log),
take(10),
));
let lastTimer = 0;
const last$ = new Subject<number>();
merge(
source.pipe(
scan((state, event) => {
state[1] = null;
const stamp = new Date().getTime();
clearTimeout(lastTimer);
if (stamp - state[0] < 1000) {
lastTimer = setTimeout(() => last$.next(event), (stamp - state[0]) + 50);
return state;
}
state[0] = stamp;
state[1] = event;
return state;
}, [0, null]),
filter(([, event]) => event !== null),
map(([, event]) => event || 0),
),
last$,
).pipe(
map(x => `Received ${JSON.stringify(x)}!`)
).subscribe(x =>
console.log(
"%c" + x,
"background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
)
);