BackpressureStrategy.BUFFER 和 rxjava 2 中的 onBackpressureBuffer 运算符之间的区别

Difference between BackpressureStrategy.BUFFER and onBackpressureBuffer operator in rxjava 2

我是反应式编程领域的新手,我正在尝试使用 rxjava 2 创建一个简单的反压感知消息处理。

以下是我要实现的工作流程:

  1. Flowable of a continuous string stream.

  2. 执行耗时操作并将消息更改为另一个字符串

  3. 执行另一个耗时操作。

现在我使用以下代码:

{
    Flowable.create(subscriber -> {
             some_stream.forEach(data -> {
                subscriber.onNext(data);
            });
        }, BackpressureStrategy.BUFFER).
    subscribeOn(Schedulers.io()). // Data emission will run io scheduler
    observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
    map(val -> Time_Consuming_Task(val)). // Task returns another string
    observeOn(Schedulers.io()).  / Next consumer will run on computation scheduler
    subscribe(val -> Another_Time_Consuming_Task(val));
}

现在对于小型操作,我没有看到任何与背压相关的问题。

但对于大型流,我不知道它会如何表现。

现在我的问题是:-

  1. BackpressureStrategy.BUFFER 的情况下,默认缓冲区大小是多少?数据在哪里缓冲?

  2. 如果我想创建两个背压缓冲区,每个在每个耗时任务之前,我应该使用 onBackpressureBuffer operator 吗?

  3. 如果缓冲区已满,我不想丢失数据,我想等待或在那种情况下发生什么?

回答您的问题:

1。默认缓冲区大小因平台而异。在 JVM 上,每个环形缓冲区有 128 个项目,在 Android 上有 16 个项目 (Source)

这比之前的 1024 有所降低(您可以看到 RxJava 中正在实施的更改 here)。还有一个系统属性,您可以根据需要自行调整:

System.setProperty("rx.ring-buffer.size", "8");

由于它们被称为 环形缓冲区,因此它们存储在内存中。您可以阅读更多关于它们的信息 here.

2。 & 3. 如果它变满,它会开始覆盖自己。在这种情况下,使用 onBackpressureBuffer

A consequence of the circular buffer is that when it is full and a subsequent write is performed, then it starts overwriting the oldest data.

引自关于 Circular buffer 的维基文章。

当你知道你的 rx.ring-buffer.size 时,你能做的最好的事情就是使用 RxJava 2 中给出的以下 API:

onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
    Action0 onOverflow, // The desired action to execute
    BackpressureOverflow.Strategy strategy) // The desired strategy to use

再一次,因为我不能说得更好,让我引用 Backpressure (2.0) 上的 RxJava wiki:

The BackpressureOverflow.Strategy is an interface actually but the class BackpressureOverflow offers 4 static fields with implementations of it representing typical actions:

  • ON_OVERFLOW_ERROR: this is the default behavior of the previous two overloads, signalling a BufferOverflowException
  • ON_OVERFLOW_DEFAULT: currently it is the same as ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST: if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream requests.
  • ON_OVERFLOW_DROP_OLDEST: drops the oldest element in the buffer and adds the current value to it.

Note that the last two strategies cause discontinuity in the stream as they drop out elements. In addition, they won't signal BufferOverflowException.

这是一个例子:

Flowable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

值得注意的是:

The Observable type in RxJava 2.x has no concept of backpressure. Implementing Observable is effectively the same as using onBackpressureBuffer() by default. UI events, one-off network requests, and state changes should all work with this approach. The Completable, Maybe, and Single types can also dictate this behavior.

If you need to support backpressure, RxJava 2.x’s new class, Flowable, is backpressure-aware like Observable was in RxJava 1.x. However, the updated library now requires an explicit choice of a backpressure strategy to prevent surprise MissingBackpressureExceptions.

阅读更多: