带背压的 Rxjs 缓冲区实现

Rxjs buffer implementation with backpressure

RxJS 新手问题来了! 所以我有这个基本缓冲区,它将 source1 和 source2 中的所有内容附加到一个数组。在某些情况下,缓冲区会被清除。

var buffer = Rx.Observable.merge(source1, source2).scan(
  function (arr, item) { 
    if (!magic) { 
      return arr.push(item); 
    }
    else { 
      return [item]; //Clear the buffer from previous items
    } 
  }, []);

我还想要一个 "consumer" 缓冲区,它可以从缓冲区中移出项目并对它们进行操作。我如何实现它并确保消费者更新可观察缓冲区?

编辑:我想将数据输入 SourceBuffer but it is only allowed to append data to it while it is not updating. That gives me a backpressure situation I guess. So I did try to create a controlled observable 但不知道如何使用我自己的缓冲创建我自己的版本。

So I have this basic buffer which appends everything from source1 and source2 to an array. Under certain conditions the buffer is cleared.

您需要做的是:

var sourceStream = Rx.Observable.merge(soruce1, source2);

var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3);
// someOperator is where you perform the "magic"

var subscribeToThisStream = sourceStream.buffer( boundary ); 
// emits all items collected in the buffer between two boundary emitions

I also want to have a "consumer" of the buffer, which shifts items from the buffer and does things with them. How do I implement that and make sure the consumer updates the buffer observable?

如果你想通过订阅者来做,如果你想以 Rx 方式做事,那绝对是不可取的,在某些情况下甚至可能是不可能的。