去抖和缓冲 rxjs 订阅
Debounce and buffer an rxjs subscription
我有一个消息队列处理器,可以将消息提供给服务...
q.on("message", (m) => {
service.create(m)
.then(() => m.ack())
.catch(() => n.nack())
})
该服务使用 RxJS Observable 并订阅 debounceTime()
这些请求。
class Service {
constructor() {
this.subject = new Subject()
this.subject.debounceTime(1000)
.subscribe(({ req, resolve, reject }) =>
someOtherService.doWork(req)
.then(() => resolve())
.catch(() => reject())
)
}
create(req) {
return new Promise((resolve, reject) =>
this.subject.next({
req,
resolve,
reject
})
)
}
}
问题是只有去抖动的请求得到 ackd/nackd。如何确保订阅也 resolves/rejects 其他请求? bufferTime()
让我参与其中,但它不会重置每次调用 next()
.
的超时持续时间
您当前使用的 debounceTime
运算符可用于创建可通知 buffer
当前缓冲区何时应关闭的可观察对象。
然后,buffer
将发出一组在去抖动时收到的消息,您可以随心所欲地处理它们:
this.subject = new Subject();
const closingNotifier = this.subject.debounceTime(1000);
this.subject.buffer(closingNotifier).subscribe(messages => {
const last = messages.length - 1;
messages.forEach(({ req, resolve, reject }, index) => {
if (index === last) {
/* whatever you are doing, now, with the debounced message */
} else {
/* whatever you need to do with the ignored messages */
}
});
});
对于那些正在寻找 RXJS 6 解决方案的人,我创建了一个自定义运算符,其行为类似于上一个答案中的 debounce()
+ buffer()
。
我将其命名为 bufferDebounce
,带有类型推断的 Typescript 代码片段在此处:
import { Observable, OperatorFunction } from 'rxjs'
import { buffer, debounceTime } from 'rxjs/operators'
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
}),
);
您可以在此示例中测试它的行为以检查它是否适合您https://stackblitz.com/edit/rxjs6-buffer-debounce
我有一个消息队列处理器,可以将消息提供给服务...
q.on("message", (m) => {
service.create(m)
.then(() => m.ack())
.catch(() => n.nack())
})
该服务使用 RxJS Observable 并订阅 debounceTime()
这些请求。
class Service {
constructor() {
this.subject = new Subject()
this.subject.debounceTime(1000)
.subscribe(({ req, resolve, reject }) =>
someOtherService.doWork(req)
.then(() => resolve())
.catch(() => reject())
)
}
create(req) {
return new Promise((resolve, reject) =>
this.subject.next({
req,
resolve,
reject
})
)
}
}
问题是只有去抖动的请求得到 ackd/nackd。如何确保订阅也 resolves/rejects 其他请求? bufferTime()
让我参与其中,但它不会重置每次调用 next()
.
您当前使用的 debounceTime
运算符可用于创建可通知 buffer
当前缓冲区何时应关闭的可观察对象。
然后,buffer
将发出一组在去抖动时收到的消息,您可以随心所欲地处理它们:
this.subject = new Subject();
const closingNotifier = this.subject.debounceTime(1000);
this.subject.buffer(closingNotifier).subscribe(messages => {
const last = messages.length - 1;
messages.forEach(({ req, resolve, reject }, index) => {
if (index === last) {
/* whatever you are doing, now, with the debounced message */
} else {
/* whatever you need to do with the ignored messages */
}
});
});
对于那些正在寻找 RXJS 6 解决方案的人,我创建了一个自定义运算符,其行为类似于上一个答案中的 debounce()
+ buffer()
。
我将其命名为 bufferDebounce
,带有类型推断的 Typescript 代码片段在此处:
import { Observable, OperatorFunction } from 'rxjs'
import { buffer, debounceTime } from 'rxjs/operators'
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
}),
);
您可以在此示例中测试它的行为以检查它是否适合您https://stackblitz.com/edit/rxjs6-buffer-debounce