如何在消息准备好时使用反应式 Flux/Mono 将消息推送到上游而不是轮询状态?
How to push message to upstream using reactive Flux/Mono whenever they are ready than polling in interval for status?
尝试将消息推送到上游,只要它们 available/ready 并在刷新后关闭连接,而不是使用 spring 反应通量间隔轮询消息。
@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
return Flux.<String>interval(Duration.ofSeconds(3))
.map(status -> {
if (getSomething(randomId).
equalsIgnoreCase("value"))
return "value";
return "ping";
}).take(Duration.ofSeconds(60)).timeout(Duration.ofSeconds(60));
}
Kafka 侦听器在获取时更新地图中的 randomId 值,getSomething 方法检查地图中间隔的 randomId 值。因此,我不想检查时间间隔并将数据存储在地图中,而是想在侦听器接收到消息时将消息推送给客户端。
这听起来像是一个 Flux.create()
请求:
return Flux.<String>create(emitter -> {
if (getSomething(randomId).equalsIgnoreCase("value")) {
sink.next("value");
}
else {
sink.next("ping");
}
});
/**
* Programmatically create a {@link Flux} with the capability of emitting multiple
* elements in a synchronous or asynchronous manner through the {@link FluxSink} API.
* This includes emitting elements from multiple threads.
* <p>
* <img class="marble" src="doc-files/marbles/createForFlux.svg" alt="">
* <p>
* This Flux factory is useful if one wants to adapt some other multi-valued async API
* and not worry about cancellation and backpressure (which is handled by buffering
* all signals if the downstream can't keep up).
* <p>
* For example:
*
* <pre><code>
* Flux.<String>create(emitter -> {
*
* ActionListener al = e -> {
* emitter.next(textField.getText());
* };
* // without cleanup support:
*
* button.addActionListener(al);
*
* // with cleanup support:
*
* button.addActionListener(al);
* emitter.onDispose(() -> {
* button.removeListener(al);
* });
* });
* </code></pre>
*
* @reactor.discard The {@link FluxSink} exposed by this operator buffers in case of
* overflow. The buffer is discarded when the main sequence is cancelled.
*
* @param <T> The type of values in the sequence
* @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.
* @return a {@link Flux}
* @see #push(Consumer)
*/
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
我基于这个 Whosebug 答案构建了解决方案,使用 EmitterProcessor 在消息可用时热发布消息。
这里是示例代码
@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
Flux<String> autoConnect = emitterProcessor.publish().autoConnect();
FluxSink<String> sink = emitterProcessor.sink();
//storing randomId and processor sink details
randomIdMap.putIfAbsent(randomId, emitterProcessor);
/** This will return ping status to notify client as
connection is alive until the randomId message received. **/
sendPingStatus(sink, randomId);
}
下面的方法展示了如何在消息到达 kafka 消费者时将消息推送到客户端并关闭通量连接。
@KafkaListener(topics = "some-subscription-id",
containerFactory = "kafkaListenerContainerFactory")
public void pushMessage(SomeMessage message, Acknowledgment acknowledgment) {
EmitterProcessor emitter = randomIdMap.get("randomId");
if (emitter != null ) {
emitter.onNext(message);
emitter.onComplete();
randomIdMap.remove("randomId");
acknowledgment.acknowledge();
}
}
尝试将消息推送到上游,只要它们 available/ready 并在刷新后关闭连接,而不是使用 spring 反应通量间隔轮询消息。
@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
return Flux.<String>interval(Duration.ofSeconds(3))
.map(status -> {
if (getSomething(randomId).
equalsIgnoreCase("value"))
return "value";
return "ping";
}).take(Duration.ofSeconds(60)).timeout(Duration.ofSeconds(60));
}
Kafka 侦听器在获取时更新地图中的 randomId 值,getSomething 方法检查地图中间隔的 randomId 值。因此,我不想检查时间间隔并将数据存储在地图中,而是想在侦听器接收到消息时将消息推送给客户端。
这听起来像是一个 Flux.create()
请求:
return Flux.<String>create(emitter -> {
if (getSomething(randomId).equalsIgnoreCase("value")) {
sink.next("value");
}
else {
sink.next("ping");
}
});
/**
* Programmatically create a {@link Flux} with the capability of emitting multiple
* elements in a synchronous or asynchronous manner through the {@link FluxSink} API.
* This includes emitting elements from multiple threads.
* <p>
* <img class="marble" src="doc-files/marbles/createForFlux.svg" alt="">
* <p>
* This Flux factory is useful if one wants to adapt some other multi-valued async API
* and not worry about cancellation and backpressure (which is handled by buffering
* all signals if the downstream can't keep up).
* <p>
* For example:
*
* <pre><code>
* Flux.<String>create(emitter -> {
*
* ActionListener al = e -> {
* emitter.next(textField.getText());
* };
* // without cleanup support:
*
* button.addActionListener(al);
*
* // with cleanup support:
*
* button.addActionListener(al);
* emitter.onDispose(() -> {
* button.removeListener(al);
* });
* });
* </code></pre>
*
* @reactor.discard The {@link FluxSink} exposed by this operator buffers in case of
* overflow. The buffer is discarded when the main sequence is cancelled.
*
* @param <T> The type of values in the sequence
* @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.
* @return a {@link Flux}
* @see #push(Consumer)
*/
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
我基于这个 Whosebug
这里是示例代码
@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
Flux<String> autoConnect = emitterProcessor.publish().autoConnect();
FluxSink<String> sink = emitterProcessor.sink();
//storing randomId and processor sink details
randomIdMap.putIfAbsent(randomId, emitterProcessor);
/** This will return ping status to notify client as
connection is alive until the randomId message received. **/
sendPingStatus(sink, randomId);
}
下面的方法展示了如何在消息到达 kafka 消费者时将消息推送到客户端并关闭通量连接。
@KafkaListener(topics = "some-subscription-id",
containerFactory = "kafkaListenerContainerFactory")
public void pushMessage(SomeMessage message, Acknowledgment acknowledgment) {
EmitterProcessor emitter = randomIdMap.get("randomId");
if (emitter != null ) {
emitter.onNext(message);
emitter.onComplete();
randomIdMap.remove("randomId");
acknowledgment.acknowledge();
}
}