源发射速率快时的 Rxjs 缓冲区
Rxjs buffer when source emitions rate is fast
比方说,我有一个每秒 x0 次发射(可能是 50、60 次……)的可观察到的发射,有时它只是每秒发射 1 次或 2 次。
现在我怎样才能缓冲那些快速发射并仍然处理慢速发射。
我累的是:
BufferTime 需要一个时间跨度,因此即使一次发射也会被缓冲,(加上 BufferTime 会使量角器测试超时)。
BufferCount(x) 在收到所有 x 次发射后才会发射。
听起来你想要类似于 debounce + buffer 的东西。最简单的实现是使用流的反跳来触发发出同一流的缓冲区。您可能希望共享流以防止重复订阅。这是一个 运行 示例:
const source = new Rx.Observable.create((o) => {
let count = 0;
const emit = () => {
const timeout = Math.random() * 1000;
setTimeout(() => {
o.next(count++);
if (count < 20) {
emit();
} else {
o.complete();
}
}, timeout);
};
emit();
}).share();
const triggerBuffer = source.debounceTime(500);
source.buffer(triggerBuffer).subscribe((x) => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>
请注意,去抖没有上限,因为如果它在去抖时间内继续接收值,它就不会发出。实际上,这不会对您的场景产生影响,但在其他场景中,理论上可以。
如 bygrace 所述,您要找的是 debounce + buffer
在现代 RXJS 6 和 ES6 Typescript 中添加类型推断,我创建了一个自定义 OperatorFunction 来非常简单地做到这一点,称为 bufferDebounce
.
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
})
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log);
你可以在这里看到一个工作示例https://stackblitz.com/edit/rxjs6-buffer-debounce
希望这对您和任何新人有所帮助。
比方说,我有一个每秒 x0 次发射(可能是 50、60 次……)的可观察到的发射,有时它只是每秒发射 1 次或 2 次。
现在我怎样才能缓冲那些快速发射并仍然处理慢速发射。
我累的是:
BufferTime 需要一个时间跨度,因此即使一次发射也会被缓冲,(加上 BufferTime 会使量角器测试超时)。
BufferCount(x) 在收到所有 x 次发射后才会发射。
听起来你想要类似于 debounce + buffer 的东西。最简单的实现是使用流的反跳来触发发出同一流的缓冲区。您可能希望共享流以防止重复订阅。这是一个 运行 示例:
const source = new Rx.Observable.create((o) => {
let count = 0;
const emit = () => {
const timeout = Math.random() * 1000;
setTimeout(() => {
o.next(count++);
if (count < 20) {
emit();
} else {
o.complete();
}
}, timeout);
};
emit();
}).share();
const triggerBuffer = source.debounceTime(500);
source.buffer(triggerBuffer).subscribe((x) => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>
请注意,去抖没有上限,因为如果它在去抖时间内继续接收值,它就不会发出。实际上,这不会对您的场景产生影响,但在其他场景中,理论上可以。
如 bygrace 所述,您要找的是 debounce + buffer
在现代 RXJS 6 和 ES6 Typescript 中添加类型推断,我创建了一个自定义 OperatorFunction 来非常简单地做到这一点,称为 bufferDebounce
.
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
})
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log);
你可以在这里看到一个工作示例https://stackblitz.com/edit/rxjs6-buffer-debounce
希望这对您和任何新人有所帮助。