Reactor/RxJava 从 SQS 读取时如何背压
Reactor/RxJava How do back pressure when reading from SQS
我是整个反应式范式的新手,我正在尝试了解从 SQS 等队列读取数据时背压的工作原理。
在 Reactor 中你有 Flux,在 RxJava 中你有 Observable 轮询 SQS 在后台,如:
while (true) {
Future<ReceiveMessageResult> future = sqsClient.receiveMessageAsync(queueUrl);
//emit or send to subscribers
}
假设您有一个下游组件需要进行速率受限的 REST 调用。您如何告诉轮询器由于速率限制而放慢速度,这样您就不会在内存中留下一堆可能导致 OOM 的实时消息?
在这种情况下,您需要使用Flowable。使用Flowable,订阅者可以一次请求一定数量的消息,在处理完收到的消息后,它会请求下一批。
参考:https://medium.com/@srinuraop/rxjava-backpressure-3376130e76c1
Flowable<Integer> observable = Flowable.range(1, 133);
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override public void onStart() {
request(1);
}
@Override public void onNext(Integer t) {
LOGGER.info("item "+t);
//this where you request message, in this case one message at time
//and after processing one message it will request for next one.
request(1);
}
@Override public void onError(Throwable t) {
LOGGER.info(""+t);
}
@Override public void onComplete() {
LOGGER.info("complete");
}
});
我是整个反应式范式的新手,我正在尝试了解从 SQS 等队列读取数据时背压的工作原理。
在 Reactor 中你有 Flux,在 RxJava 中你有 Observable 轮询 SQS 在后台,如:
while (true) {
Future<ReceiveMessageResult> future = sqsClient.receiveMessageAsync(queueUrl);
//emit or send to subscribers
}
假设您有一个下游组件需要进行速率受限的 REST 调用。您如何告诉轮询器由于速率限制而放慢速度,这样您就不会在内存中留下一堆可能导致 OOM 的实时消息?
在这种情况下,您需要使用Flowable。使用Flowable,订阅者可以一次请求一定数量的消息,在处理完收到的消息后,它会请求下一批。
参考:https://medium.com/@srinuraop/rxjava-backpressure-3376130e76c1
Flowable<Integer> observable = Flowable.range(1, 133);
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override public void onStart() {
request(1);
}
@Override public void onNext(Integer t) {
LOGGER.info("item "+t);
//this where you request message, in this case one message at time
//and after processing one message it will request for next one.
request(1);
}
@Override public void onError(Throwable t) {
LOGGER.info(""+t);
}
@Override public void onComplete() {
LOGGER.info("complete");
}
});