Spring WebFlux(通量):如何动态发布
Spring WebFlux (Flux): how to publish dynamically
我是 Reactive 编程和 Spring WebFlux 的新手。我想让我的 App 1 通过 Flux 发布 Server Sent 事件,而我的 App 2 不断地监听它。
我希望 Flux 按需发布(例如,当发生某些事情时)。我找到的所有例子都是使用 Flux.interval 定期发布事件,并且一旦创建 Flux 中的内容似乎就无法 append/modify
我怎样才能实现我的目标?或者我在概念上完全错误。
使用 FluxProcessor
和 FluxSink
“动态”发布
手动向 Flux
提供数据的技术之一是使用 FluxProcessor#sink
方法,如下例
@SpringBootApplication
@RestController
public class DemoApplication {
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
sink.next("Hello World #" + counter.getAndIncrement());
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
在这里,我创建了 DirectProcessor
以支持多个订阅者,它们将监听数据流。此外,我还提供了额外的 FluxProcessor#serialize
,它为多生产者提供安全支持(从不同线程调用而不违反 Reactive Streams 规范规则,尤其是 rule 1.3)。最后,通过调用“http://localhost:8080/send”,我们将看到消息 Hello World #1
(当然,前提是您之前连接到“http://localhost:8080”)
Reactor 3.4 更新
在 Reactor 3.4 中,您有一个名为 reactor.core.publisher.Sinks
的新 API。 Sinks
API 为手动数据发送提供了一个流畅的构建器,它允许您指定诸如流中的元素数量和背压行为、支持的订阅者数量和重放功能等内容:
@SpringBootApplication
@RestController
public class DemoApplication {
final Sinks.Many sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());
if (result.isFailure()) {
// do something here, since emission failed
}
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
}
}
请注意,通过 Sinks
API 发送消息引入了 emission
的新概念及其结果。 API 的原因是 Reactor 扩展了 Reactive-Streams 并且必须遵循背压控制。也就是说,如果您 emit
的信号比请求的多,并且底层实现不支持缓冲,您的消息将不会被传递。因此,tryEmitNext
returns 的结果 EmitResult
指示消息是否已发送。
此外,请注意,默认情况下 Sinsk
API 给出 Sink
的序列化版本,这意味着您不必关心并发性。但是,如果您事先知道消息的发送是串行的,您可以构建一个 Sinks.unsafe()
版本,它不会序列化给定的消息
另一个想法,使用 EmitterProcessor 作为通量的入口
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class MyEmitterProcessor {
EmitterProcessor<String> emitterProcessor;
public static void main(String args[]) {
MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
Flux<String> publisher = myEmitterProcessor.getPublisher();
myEmitterProcessor.onNext("A");
myEmitterProcessor.onNext("B");
myEmitterProcessor.onNext("C");
myEmitterProcessor.complete();
publisher.subscribe(x -> System.out.println(x));
}
public Flux<String> getPublisher() {
emitterProcessor = EmitterProcessor.create();
return emitterProcessor.map(x -> "consume: " + x);
}
public void onNext(String nextString) {
emitterProcessor.onNext(nextString);
}
public void complete() {
emitterProcessor.onComplete();
}
}
更多信息,see here from Reactor doc。文档本身有一个建议,“大多数时候,你应该尽量避免使用处理器。它们更难正确使用,并且容易出现一些极端情况。”但是我不知道哪种极端情况。
我是 Reactive 编程和 Spring WebFlux 的新手。我想让我的 App 1 通过 Flux 发布 Server Sent 事件,而我的 App 2 不断地监听它。
我希望 Flux 按需发布(例如,当发生某些事情时)。我找到的所有例子都是使用 Flux.interval 定期发布事件,并且一旦创建 Flux 中的内容似乎就无法 append/modify
我怎样才能实现我的目标?或者我在概念上完全错误。
使用 FluxProcessor
和 FluxSink
“动态”发布
手动向 Flux
提供数据的技术之一是使用 FluxProcessor#sink
方法,如下例
@SpringBootApplication
@RestController
public class DemoApplication {
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
sink.next("Hello World #" + counter.getAndIncrement());
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
在这里,我创建了 DirectProcessor
以支持多个订阅者,它们将监听数据流。此外,我还提供了额外的 FluxProcessor#serialize
,它为多生产者提供安全支持(从不同线程调用而不违反 Reactive Streams 规范规则,尤其是 rule 1.3)。最后,通过调用“http://localhost:8080/send”,我们将看到消息 Hello World #1
(当然,前提是您之前连接到“http://localhost:8080”)
Reactor 3.4 更新
在 Reactor 3.4 中,您有一个名为 reactor.core.publisher.Sinks
的新 API。 Sinks
API 为手动数据发送提供了一个流畅的构建器,它允许您指定诸如流中的元素数量和背压行为、支持的订阅者数量和重放功能等内容:
@SpringBootApplication
@RestController
public class DemoApplication {
final Sinks.Many sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());
if (result.isFailure()) {
// do something here, since emission failed
}
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
}
}
请注意,通过 Sinks
API 发送消息引入了 emission
的新概念及其结果。 API 的原因是 Reactor 扩展了 Reactive-Streams 并且必须遵循背压控制。也就是说,如果您 emit
的信号比请求的多,并且底层实现不支持缓冲,您的消息将不会被传递。因此,tryEmitNext
returns 的结果 EmitResult
指示消息是否已发送。
此外,请注意,默认情况下 Sinsk
API 给出 Sink
的序列化版本,这意味着您不必关心并发性。但是,如果您事先知道消息的发送是串行的,您可以构建一个 Sinks.unsafe()
版本,它不会序列化给定的消息
另一个想法,使用 EmitterProcessor 作为通量的入口
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class MyEmitterProcessor {
EmitterProcessor<String> emitterProcessor;
public static void main(String args[]) {
MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
Flux<String> publisher = myEmitterProcessor.getPublisher();
myEmitterProcessor.onNext("A");
myEmitterProcessor.onNext("B");
myEmitterProcessor.onNext("C");
myEmitterProcessor.complete();
publisher.subscribe(x -> System.out.println(x));
}
public Flux<String> getPublisher() {
emitterProcessor = EmitterProcessor.create();
return emitterProcessor.map(x -> "consume: " + x);
}
public void onNext(String nextString) {
emitterProcessor.onNext(nextString);
}
public void complete() {
emitterProcessor.onComplete();
}
}
更多信息,see here from Reactor doc。文档本身有一个建议,“大多数时候,你应该尽量避免使用处理器。它们更难正确使用,并且容易出现一些极端情况。”但是我不知道哪种极端情况。