一次只发出 1 个事件的 Reactor Sink?
Reactor Sink that emits only 1 event at a time?
我正在玩 Replaying Reactor Sinks,我正在尝试实现单播和重播处理器的混合。我希望它同时只发送给一个订阅者 (UnicastProcessor),但它也可以在订阅时发送默认值 (ReplayProcessor)。这是类似于真实案例的内容:
Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + " emiting next"));
for(int i = 0; i < 5; i++) {
new Thread(() -> {
monoC.flatMap(unused ->
webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(new Date() + " - " + Thread.currentThread().getName() + " finished processing");
})
).subscribe();
}).start();
}
即打印:
emiting next
...
emiting next
finished processing
...
finished processing
相反,我希望它打印:
emiting next
finished processing
...
emiting next
finished processing
更新,对真实案例场景的更多说明:
真实案例场景是:我有一个 Spring WebFlux 应用程序,它充当中继器,它在特定端点 A 上接收请求,并将其中继到另一个微服务 B。然后该微服务可以如果我走得太快,请回复 429,并回复 header 我必须等待多长时间才能再次重试。我已经通过 .retry 运算符和 Mono.delay 实现了重试,但与此同时,我可以在我的第一个端点 A 上收到另一个请求,该请求必须被阻止,直到 Mono.delay 完成.
我正在尝试通过 Replay Sink 实现这一点,因此在收到 429 后,我向接收器发出一个“false”,在 Mono.delay 结束后,它向接收器发出一个 true,因此,如果同时我收到关于 A 的任何进一步请求,它可以过滤掉所有错误并等待发出 true。
我最重要的问题是,当我收到太多中继 A 的请求时,微服务 B 开始响应缓慢,并且变得过载。因此,我想限制 Sink 发出的速率。准确地说,我希望发布者发出一个值,但在订阅者点击 onCompleted 之前不要再发出任何值。
一旦我正确理解了您的问题,您希望按顺序处理对 B 的请求。在那种情况下,你应该看看 https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)
我觉得你的情况应该是这样的
//sinks should be global variable for your controller, initialized in @PostConstruct
var sinks = Sinks
//unsafe is required for multithreading
.unsafe()
.many()
.replay()
.latest();
sinks.asFlux()
.doOnNext(it -> System.out.printf("%s is emitting %s\n", Thread.currentThread().getName(), it))
.flatMap(counter -> {
return webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(counter + " " + new Date() + " - " + Thread.currentThread().getName() + " finished processing with " + stringResponseEntity.getStatusCode());
})
.then(Mono.just(counter));
//concurrency = 1 causes the flatMap being handled only once in parallel
}, 1)
.doOnError(Throwable::printStackTrace)
//this subscription also must be done in @PostConstruct
.subscribe(counter -> System.out.printf("%s completed in %s\n", counter, Thread.currentThread().getName()));
//and this is your endpoint method
for (int i = 0; i < 5; i++) {
int counter = i;
new Thread(() -> {
var result = sinks.tryEmitNext(counter);
if (result.isFailure()) {
//mb in that case you should retry
System.out.printf("%s emitted %s. with fail: %s\n", Thread.currentThread().getName(), counter, result);
} else {
System.out.printf("%s successfully emitted %s\n", Thread.currentThread().getName(), counter);
}
}).start();
}
我正在玩 Replaying Reactor Sinks,我正在尝试实现单播和重播处理器的混合。我希望它同时只发送给一个订阅者 (UnicastProcessor),但它也可以在订阅时发送默认值 (ReplayProcessor)。这是类似于真实案例的内容:
Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + " emiting next"));
for(int i = 0; i < 5; i++) {
new Thread(() -> {
monoC.flatMap(unused ->
webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(new Date() + " - " + Thread.currentThread().getName() + " finished processing");
})
).subscribe();
}).start();
}
即打印:
emiting next
...
emiting next
finished processing
...
finished processing
相反,我希望它打印:
emiting next
finished processing
...
emiting next
finished processing
更新,对真实案例场景的更多说明:
真实案例场景是:我有一个 Spring WebFlux 应用程序,它充当中继器,它在特定端点 A 上接收请求,并将其中继到另一个微服务 B。然后该微服务可以如果我走得太快,请回复 429,并回复 header 我必须等待多长时间才能再次重试。我已经通过 .retry 运算符和 Mono.delay 实现了重试,但与此同时,我可以在我的第一个端点 A 上收到另一个请求,该请求必须被阻止,直到 Mono.delay 完成.
我正在尝试通过 Replay Sink 实现这一点,因此在收到 429 后,我向接收器发出一个“false”,在 Mono.delay 结束后,它向接收器发出一个 true,因此,如果同时我收到关于 A 的任何进一步请求,它可以过滤掉所有错误并等待发出 true。
我最重要的问题是,当我收到太多中继 A 的请求时,微服务 B 开始响应缓慢,并且变得过载。因此,我想限制 Sink 发出的速率。准确地说,我希望发布者发出一个值,但在订阅者点击 onCompleted 之前不要再发出任何值。
一旦我正确理解了您的问题,您希望按顺序处理对 B 的请求。在那种情况下,你应该看看 https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)
我觉得你的情况应该是这样的
//sinks should be global variable for your controller, initialized in @PostConstruct
var sinks = Sinks
//unsafe is required for multithreading
.unsafe()
.many()
.replay()
.latest();
sinks.asFlux()
.doOnNext(it -> System.out.printf("%s is emitting %s\n", Thread.currentThread().getName(), it))
.flatMap(counter -> {
return webClientBuilder.build()
.get()
.uri("https://www.google.com")
.retrieve()
.toEntityFlux(String.class)
.doOnSuccess(stringResponseEntity -> {
System.out.println(counter + " " + new Date() + " - " + Thread.currentThread().getName() + " finished processing with " + stringResponseEntity.getStatusCode());
})
.then(Mono.just(counter));
//concurrency = 1 causes the flatMap being handled only once in parallel
}, 1)
.doOnError(Throwable::printStackTrace)
//this subscription also must be done in @PostConstruct
.subscribe(counter -> System.out.printf("%s completed in %s\n", counter, Thread.currentThread().getName()));
//and this is your endpoint method
for (int i = 0; i < 5; i++) {
int counter = i;
new Thread(() -> {
var result = sinks.tryEmitNext(counter);
if (result.isFailure()) {
//mb in that case you should retry
System.out.printf("%s emitted %s. with fail: %s\n", Thread.currentThread().getName(), counter, result);
} else {
System.out.printf("%s successfully emitted %s\n", Thread.currentThread().getName(), counter);
}
}).start();
}