Reactor/RxJava 从 SQS 读取时如何背压

Reactor/RxJava How do back pressure when reading from SQS

我是整个反应式范式的新手,我正在尝试了解从 SQS 等队列读取数据时背压的工作原理。

在 Reactor 中你有 Flux,在 RxJava 中你有 Observable 轮询 SQS 在后台,如:

while (true) {
   Future<ReceiveMessageResult> future = sqsClient.receiveMessageAsync(queueUrl);
   //emit or send to subscribers
}

假设您有一个下游组件需要进行速率受限的 REST 调用。您如何告诉轮询器由于速率限制而放慢速度,这样您就不会在内存中留下一堆可能导致 OOM 的实时消息?

在这种情况下,您需要使用Flowable。使用Flowable,订阅者可以一次请求一定数量的消息,在处理完收到的消息后,它会请求下一批。

参考:https://medium.com/@srinuraop/rxjava-backpressure-3376130e76c1

Flowable<Integer> observable = Flowable.range(1, 133);
observable.subscribe(new DefaultSubscriber<Integer>() {
    @Override public void onStart() {
        request(1);
    }
    @Override public void onNext(Integer t) {
        LOGGER.info("item "+t);
        //this where you request message, in this case one message at time 
        //and after processing one message it will request for next one.
        request(1);
    }
    @Override public void onError(Throwable t) {
        LOGGER.info(""+t);
    }
    @Override public void onComplete() {
        LOGGER.info("complete");
    }
});