Reactor - 在处理错误的情况下延迟 Flux 元素
Reactor - Delay Flux elements in case of processing errors
我遇到了与 this question 类似的问题,但我没有看到可接受的答案。我已经研究过了,没有得到满意的答案。
我有一个反应式 Kafka 消费者(Spring 反应器),轮询量为 'x',应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务可能会超时执行不同的操作,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。现在的reactor有没有办法自动
- 当断路器处于打开状态时做出反应,减少轮询量或减缓消耗。
- 当电路关闭时,将轮询数量增加到以前的状态(如果外部服务下降,则会按比例增加)。
我不想使用 delayElements
或 delayUntil
,因为它们本质上大多是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用程序配置中打开时,我会为消费者提供值。
由于背压是基于消费者的缓慢,实现这一点的一种方法是将某些异常类型转换为延迟。为此,您可以使用 onErrorResume
,如下所示:
long start = System.currentTimeMillis();
Flux.range(1, 1000)
.doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
.flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
.blockLast();
System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");
private Mono<Integer> process(Integer item) {
// simulate error for some items
if (item >= 50 && item <= 100) {
return Mono.error(new RuntimeException("Downstream failed."));
}
// normal processing
return Mono.delay(Duration.ofMillis(10))
.thenReturn(item);
}
private Mono<Integer> slowDown(Throwable e) {
if (e instanceof RuntimeException) { // you could check for circuit breaker exception
return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
}
return Mono.empty(); // no delay for other errors
}
如果您检查此代码的输出,您可以看到项目 50 和 100 之间有一些减速,但它在前后以正常速度工作。
请注意,我的示例未使用 Kafka。当您使用支持背压的 reactor-kafka 库时,它应该以与此虚拟示例相同的方式工作。
此外,由于 Flux 可能会同时处理项目,因此不会立即减速,它会在适当减速之前尝试处理一些额外的项目。
我遇到了与 this question 类似的问题,但我没有看到可接受的答案。我已经研究过了,没有得到满意的答案。
我有一个反应式 Kafka 消费者(Spring 反应器),轮询量为 'x',应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务可能会超时执行不同的操作,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。现在的reactor有没有办法自动
- 当断路器处于打开状态时做出反应,减少轮询量或减缓消耗。
- 当电路关闭时,将轮询数量增加到以前的状态(如果外部服务下降,则会按比例增加)。
我不想使用 delayElements
或 delayUntil
,因为它们本质上大多是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用程序配置中打开时,我会为消费者提供值。
由于背压是基于消费者的缓慢,实现这一点的一种方法是将某些异常类型转换为延迟。为此,您可以使用 onErrorResume
,如下所示:
long start = System.currentTimeMillis();
Flux.range(1, 1000)
.doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
.flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
.blockLast();
System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");
private Mono<Integer> process(Integer item) {
// simulate error for some items
if (item >= 50 && item <= 100) {
return Mono.error(new RuntimeException("Downstream failed."));
}
// normal processing
return Mono.delay(Duration.ofMillis(10))
.thenReturn(item);
}
private Mono<Integer> slowDown(Throwable e) {
if (e instanceof RuntimeException) { // you could check for circuit breaker exception
return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
}
return Mono.empty(); // no delay for other errors
}
如果您检查此代码的输出,您可以看到项目 50 和 100 之间有一些减速,但它在前后以正常速度工作。
请注意,我的示例未使用 Kafka。当您使用支持背压的 reactor-kafka 库时,它应该以与此虚拟示例相同的方式工作。
此外,由于 Flux 可能会同时处理项目,因此不会立即减速,它会在适当减速之前尝试处理一些额外的项目。