一次只发出 1 个事件的 Reactor Sink?

Reactor Sink that emits only 1 event at a time?

我正在玩 Replaying Reactor Sinks,我正在尝试实现单播和重播处理器的混合。我希望它同时只发送给一个订阅者 (UnicastProcessor),但它也可以在订阅时发送默认值 (ReplayProcessor)。这是类似于真实案例的内容:

Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    emiting next"));
for(int i = 0; i < 5; i++) {
    new Thread(() -> {
        monoC.flatMap(unused ->
                webClientBuilder.build()
                        .get()
                        .uri("https://www.google.com")
                        .retrieve()
                        .toEntityFlux(String.class)
                        .doOnSuccess(stringResponseEntity -> {
                            System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    finished processing");
                        })
        ).subscribe();
    }).start();
}

即打印:

emiting next
...
emiting next
finished processing
...
finished processing

相反,我希望它打印:

emiting next
finished processing
...
emiting next
finished processing

更新,对真实案例场景的更多说明:

真实案例场景是:我有一个 Spring WebFlux 应用程序,它充当中继器,它在特定端点 A 上接收请求,并将其中继到另一个微服务 B。然后该微服务可以如果我走得太快,请回复 429,并回复 header 我必须等待多长时间才能再次重试。我已经通过 .retry 运算符和 Mono.delay 实现了重试,但与此同时,我可以在我的第一个端点 A 上收到另一个请求,该请求必须被阻止,直到 Mono.delay 完成.

我正在尝试通过 Replay Sink 实现这一点,因此在收到 429 后,我向接收器发出一个“false”,在 Mono.delay 结束后,它向接收器发出一个 true,因此,如果同时我收到关于 A 的任何进一步请求,它可以过滤掉所有错误并等待发出 true。

我最重要的问题是,当我收到太多中继 A 的请求时,微服务 B 开始响应缓慢,并且变得过载。因此,我想限制 Sink 发出的速率。准确地说,我希望发布者发出一个值,但在订阅者点击 onCompleted 之前不要再发出任何值。

一旦我正确理解了您的问题,您希望按顺序处理对 B 的请求。在那种情况下,你应该看看 https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-

public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)

我觉得你的情况应该是这样的

//sinks should be global variable for your controller, initialized in @PostConstruct
        var sinks = Sinks
                //unsafe is required for multithreading
                .unsafe()
                .many()
                .replay()
                .latest();
        sinks.asFlux()
                .doOnNext(it -> System.out.printf("%s is emitting %s\n", Thread.currentThread().getName(), it))
                .flatMap(counter -> {
                    return webClientBuilder.build()
                            .get()
                            .uri("https://www.google.com")
                            .retrieve()
                            .toEntityFlux(String.class)
                            .doOnSuccess(stringResponseEntity -> {
                                System.out.println(counter + " " + new Date() + " - " + Thread.currentThread().getName() + "    finished processing with " + stringResponseEntity.getStatusCode());
                            })
                            .then(Mono.just(counter));
                    //concurrency = 1 causes the flatMap being handled only once in parallel
                }, 1)
                .doOnError(Throwable::printStackTrace)
                //this subscription also must be done in @PostConstruct
                .subscribe(counter -> System.out.printf("%s completed in %s\n", counter, Thread.currentThread().getName()));

        //and this is your endpoint method
        for (int i = 0; i < 5; i++) {
            int counter = i;
            new Thread(() -> {
                var result = sinks.tryEmitNext(counter);
                if (result.isFailure()) {
                    //mb in that case you should retry
                    System.out.printf("%s emitted %s. with fail: %s\n", Thread.currentThread().getName(), counter, result);
                } else {
                    System.out.printf("%s successfully emitted %s\n", Thread.currentThread().getName(), counter);
                }
            }).start();
        }