带背压的 Rxjs 缓冲区实现
Rxjs buffer implementation with backpressure
RxJS 新手问题来了!
所以我有这个基本缓冲区,它将 source1 和 source2 中的所有内容附加到一个数组。在某些情况下,缓冲区会被清除。
var buffer = Rx.Observable.merge(source1, source2).scan(
function (arr, item) {
if (!magic) {
return arr.push(item);
}
else {
return [item]; //Clear the buffer from previous items
}
}, []);
我还想要一个 "consumer" 缓冲区,它可以从缓冲区中移出项目并对它们进行操作。我如何实现它并确保消费者更新可观察缓冲区?
编辑:我想将数据输入 SourceBuffer but it is only allowed to append data to it while it is not updating. That gives me a backpressure situation I guess. So I did try to create a controlled observable 但不知道如何使用我自己的缓冲创建我自己的版本。
So I have this basic buffer which appends everything from source1 and source2 to an array. Under certain conditions the buffer is cleared.
您需要做的是:
var sourceStream = Rx.Observable.merge(soruce1, source2);
var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3);
// someOperator is where you perform the "magic"
var subscribeToThisStream = sourceStream.buffer( boundary );
// emits all items collected in the buffer between two boundary emitions
I also want to have a "consumer" of the buffer, which shifts items from the buffer and does things with them. How do I implement that and make sure the consumer updates the buffer observable?
如果你想通过订阅者来做,如果你想以 Rx 方式做事,那绝对是不可取的,在某些情况下甚至可能是不可能的。
RxJS 新手问题来了! 所以我有这个基本缓冲区,它将 source1 和 source2 中的所有内容附加到一个数组。在某些情况下,缓冲区会被清除。
var buffer = Rx.Observable.merge(source1, source2).scan(
function (arr, item) {
if (!magic) {
return arr.push(item);
}
else {
return [item]; //Clear the buffer from previous items
}
}, []);
我还想要一个 "consumer" 缓冲区,它可以从缓冲区中移出项目并对它们进行操作。我如何实现它并确保消费者更新可观察缓冲区?
编辑:我想将数据输入 SourceBuffer but it is only allowed to append data to it while it is not updating. That gives me a backpressure situation I guess. So I did try to create a controlled observable 但不知道如何使用我自己的缓冲创建我自己的版本。
So I have this basic buffer which appends everything from source1 and source2 to an array. Under certain conditions the buffer is cleared.
您需要做的是:
var sourceStream = Rx.Observable.merge(soruce1, source2);
var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3);
// someOperator is where you perform the "magic"
var subscribeToThisStream = sourceStream.buffer( boundary );
// emits all items collected in the buffer between two boundary emitions
I also want to have a "consumer" of the buffer, which shifts items from the buffer and does things with them. How do I implement that and make sure the consumer updates the buffer observable?
如果你想通过订阅者来做,如果你想以 Rx 方式做事,那绝对是不可取的,在某些情况下甚至可能是不可能的。