RxJs 缓冲直到数据库插入(承诺)
RxJs buffer until database insert (promise)
我有一个数据流,其中有快速传入的数据。我想通过保持顺序将它们插入数据库。我有一个数据库,其中 returns 一个承诺,在插入成功时解决。
我想制作一个 Rx 流,它缓冲新数据,直到插入缓冲数据。
我该怎么做?
我相信要获得您想要的东西,您需要创建自己的运算符。稍微脱离 RxJS 你可以得到类似的东西(警告,还没有测试)...
export class BusyBuffer<T> {
private itemQueue = new Subject<T>();
private bufferTrigger = new Subject<{}>();
private busy = false;
constructor(consumerCallback: (items: T[]) => Promise<void>) {
this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
this.busy = true;
consumerCallback(items).then(() => {
this.busy = false;
this.bufferTrigger.next(null);
});
});
}
submitItem(item: T) {
this.itemQueue.next(item);
if(!busy) {
this.bufferTrigger.next(null);
}
}
}
然后可以用作
let busyBuffer = new BusyBuffer<T>(items => {
return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));
虽然它不完全是纯粹的反应式,但有人可能会想出更好的东西。
我有一个数据流,其中有快速传入的数据。我想通过保持顺序将它们插入数据库。我有一个数据库,其中 returns 一个承诺,在插入成功时解决。
我想制作一个 Rx 流,它缓冲新数据,直到插入缓冲数据。
我该怎么做?
我相信要获得您想要的东西,您需要创建自己的运算符。稍微脱离 RxJS 你可以得到类似的东西(警告,还没有测试)...
export class BusyBuffer<T> {
private itemQueue = new Subject<T>();
private bufferTrigger = new Subject<{}>();
private busy = false;
constructor(consumerCallback: (items: T[]) => Promise<void>) {
this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
this.busy = true;
consumerCallback(items).then(() => {
this.busy = false;
this.bufferTrigger.next(null);
});
});
}
submitItem(item: T) {
this.itemQueue.next(item);
if(!busy) {
this.bufferTrigger.next(null);
}
}
}
然后可以用作
let busyBuffer = new BusyBuffer<T>(items => {
return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));
虽然它不完全是纯粹的反应式,但有人可能会想出更好的东西。