Project Reactor 使用 EmitterProcessor 作为事件总线
Project Reactor Using EmitterProcessor as an event bus
我们正在测试如何将 EmitterProcessor 用作事件总线。基本上我们想间接地让 Spring 控制器知道某事已成功完成,我们传递一个在 spring 控制器中侦听的唯一键(因此,如果不同的键进入不同的方法,我们忽略它)。
public class RequestVerifier {
// constructor
public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
this.emitterProcessor = emitterProcessor;
// put this subscription here for testing purposes
emitterProcessor.subscribe(value -> {
System.out.println(value);
});
}
public void runValue(RequestData data) {
requestService.accept(data);
FluxSink<String> sink = emitterProcessor.sink();
sink.next("SOME-KEY");
}
}
@Configuration
public class AppConfiguration {
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
}
问题是第一条消息被传递到 emitterProcessor.subscribe
。但是,当第二次调用 runValue
时发送第二条消息时,我们会得到 java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
。我们如何让订阅接受源源不断的数据?
此外,如果相关的话,我们计划在轴突事件处理程序中使用它,并从事件处理程序传递消息或密钥,以通知等待的控制器将结果发送回用户。
更新
也尝试了以下方法,但没有用
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
@Bean(name = "event-autoconnector")
public Flux<String> returnFlux(EmitterProcessor<String> emitter){
return emitter.publish().autoConnect();
}
在我的 playground Reactive CQRS 存储库中,我玩过围绕 Axon Framework 应用程序包装 reactive API。
我想您可能会在这里找到您要找的东西 https://github.com/stefanvozd/Reactive-CQRS,但请注意,它仍在进行中并且缺少 comments/documentation...
根据您尝试使用此流执行的操作,这里有几个示例
- Example 发送命令并等待投影变为具体化的其余控制器,returns 以非阻塞方式向用户显示此视图。为此,您应该使用订阅查询
- Example 您可以作为 bean 注入并附加到事件流的 Reactive Event Bus 流
- Example 的反应式事件处理程序,使用反应式数据库驱动程序 r2dbc 保存投影,如果您使用 Axon 服务器,您将获得背压支持。
目前,这些都是实验性示例,如果您可以提供更多有关您正在努力完成的目标的信息,我可以为您提供一些指导
我们正在测试如何将 EmitterProcessor 用作事件总线。基本上我们想间接地让 Spring 控制器知道某事已成功完成,我们传递一个在 spring 控制器中侦听的唯一键(因此,如果不同的键进入不同的方法,我们忽略它)。
public class RequestVerifier {
// constructor
public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
this.emitterProcessor = emitterProcessor;
// put this subscription here for testing purposes
emitterProcessor.subscribe(value -> {
System.out.println(value);
});
}
public void runValue(RequestData data) {
requestService.accept(data);
FluxSink<String> sink = emitterProcessor.sink();
sink.next("SOME-KEY");
}
}
@Configuration
public class AppConfiguration {
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
}
问题是第一条消息被传递到 emitterProcessor.subscribe
。但是,当第二次调用 runValue
时发送第二条消息时,我们会得到 java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
。我们如何让订阅接受源源不断的数据?
此外,如果相关的话,我们计划在轴突事件处理程序中使用它,并从事件处理程序传递消息或密钥,以通知等待的控制器将结果发送回用户。
更新
也尝试了以下方法,但没有用
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
@Bean(name = "event-autoconnector")
public Flux<String> returnFlux(EmitterProcessor<String> emitter){
return emitter.publish().autoConnect();
}
在我的 playground Reactive CQRS 存储库中,我玩过围绕 Axon Framework 应用程序包装 reactive API。
我想您可能会在这里找到您要找的东西 https://github.com/stefanvozd/Reactive-CQRS,但请注意,它仍在进行中并且缺少 comments/documentation...
根据您尝试使用此流执行的操作,这里有几个示例
- Example 发送命令并等待投影变为具体化的其余控制器,returns 以非阻塞方式向用户显示此视图。为此,您应该使用订阅查询
- Example 您可以作为 bean 注入并附加到事件流的 Reactive Event Bus 流
- Example 的反应式事件处理程序,使用反应式数据库驱动程序 r2dbc 保存投影,如果您使用 Axon 服务器,您将获得背压支持。
目前,这些都是实验性示例,如果您可以提供更多有关您正在努力完成的目标的信息,我可以为您提供一些指导