Reactor - 在处理错误的情况下延迟 Flux 元素

Reactor - Delay Flux elements in case of processing errors

我遇到了与 this question 类似的问题,但我没有看到可接受的答案。我已经研究过了,没有得到满意的答案。

我有一个反应式 Kafka 消费者(Spring 反应器),轮询量为 'x',应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务可能会超时执行不同的操作,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。现在的reactor有没有办法自动

  1. 当断路器处于打开状态时做出反应,减少轮询量或减缓消耗。
  2. 当电路关闭时,将轮询数量增加到以前的状态(如果外部服务下降,则会按比例增加)。

我不想使用 delayElementsdelayUntil,因为它们本质上大多是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用程序配置中打开时,我会为消费者提供值。

由于背压是基于消费者的缓慢,实现这一点的一种方法是将某些异常类型转换为延迟。为此,您可以使用 onErrorResume,如下所示:

long start = System.currentTimeMillis();

Flux.range(1, 1000)
        .doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
        .flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
        .blockLast();

System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");

private Mono<Integer> process(Integer item) {
    // simulate error for some items
    if (item >= 50 && item <= 100) {
        return Mono.error(new RuntimeException("Downstream failed."));
    }

    // normal processing
    return Mono.delay(Duration.ofMillis(10))
            .thenReturn(item);
}

private Mono<Integer> slowDown(Throwable e) {
    if (e instanceof RuntimeException) { // you could check for circuit breaker exception
        return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
    }

    return Mono.empty(); // no delay for other errors
}

如果您检查此代码的输出,您可以看到项目 50 和 100 之间有一些减速,但它在前后以正常速度工作。

请注意,我的示例未使用 Kafka。当您使用支持背压的 reactor-kafka 库时,它应该以与此虚拟示例相同的方式工作。

此外,由于 Flux 可能会同时处理项目,因此不会立即减速,它会在适当减速之前尝试处理一些额外的项目。