Observable 支持反应式拉动

Observable supporting reactive pull

我一直在为我认为是一个非常基本的问题而苦苦挣扎。

我有一个 Flowable,它从网络中检索一捆物品并发出它们。

Flowable
    .create(new FlowableOnSubscribe<Item>() {
        @Override
        public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
            Item item = retrieveNext();

            while (item != null) {
                emitter.onNext(item);

                if (item.isLast) {
                    break;
                }

                item = retrieveNext();
            }

            emitter.onComplete();
        }
    }, BackpressureStrategy.BUFFER)
    .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t) throws Exception {
            // ...
        }
    });

我需要它在每个请求的基础上工作。也就是说,我保留一个Subscription,必要时调用subscription.request(n)

关于 Backpressure 的文章("Reactive pull" 部分)描述了 Observer 的观点,并简单地提到了

For [reactive pull] to work, though, Observables A and B must respond correctly to the request() <...> such support is not a requirement for Observables

它没有详细说明原则上如何实现这种支持。

是否有实现此类 Observable/Flowable 的通用模式?我没有找到一个很好的例子或参考。我能想到的一个糟糕的选择是有一个监视器,当 emitter.requested() == 0 时我会在我的 while 循环中锁定并从另一个线程在 doOnRequest() 中解锁。但是这种方法感觉很麻烦。

那么这个"slowing down"一般是如何实现的呢?

谢谢。

使用Flowable.generate:

Flowable.generate(
  () -> generateItemRetriever(),
  (Retriever<Item> retriever, Emitter<Item> emitter) -> {
    Item item = retriever.retrieveNext();
    if(item!=null && !item.isLast()) {
      emitter.onNext(item);
    } else {
      emitter.onComplete();
    }
  }
)

生成的 Flowable 将具有背压感知和取消订阅感知功能。如果您的 Retriever 具有需要处理的状态,请使用 3 参数 generate 重载。

但是,如果您正在进行 HTTP 调用,我建议您研究一下 Retrofit 之类的东西,它与 RxJava 配合得很好。