Project Reactor 使用 EmitterProcessor 作为事件总线

Project Reactor Using EmitterProcessor as an event bus

我们正在测试如何将 EmitterProcessor 用作事件总线。基本上我们想间接地让 Spring 控制器知道某事已成功完成,我们传递一个在 spring 控制器中侦听的唯一键(因此,如果不同的键进入不同的方法,我们忽略它)。

public class RequestVerifier {

    // constructor
    public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
        this.emitterProcessor = emitterProcessor;
        // put this subscription here for testing purposes
        emitterProcessor.subscribe(value -> {
            System.out.println(value);
        });
    }

    public void runValue(RequestData data) {
        requestService.accept(data);
        FluxSink<String> sink = emitterProcessor.sink();
        sink.next("SOME-KEY");
    }

}
@Configuration
public class AppConfiguration {

    @Bean(name = "flux-event-handler-event-bus-emitter")
    public EmitterProcessor<String> createEventBusEmitter(){
        EmitterProcessor<String> emitter = EmitterProcessor.create();
        return emitter;
    }

}

问题是第一条消息被传递到 emitterProcessor.subscribe。但是,当第二次调用 runValue 时发送第二条消息时,我们会得到 java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)。我们如何让订阅接受源源不断的数据?

此外,如果相关的话,我们计划在轴突事件处理程序中使用它,并从事件处理程序传递消息或密钥,以通知等待的控制器将结果发送回用户。

更新

也尝试了以下方法,但没有用

    @Bean(name = "flux-event-handler-event-bus-emitter")
    public EmitterProcessor<String> createEventBusEmitter(){
        EmitterProcessor<String> emitter = EmitterProcessor.create();
        return emitter;
    }

    @Bean(name = "event-autoconnector")
    public Flux<String> returnFlux(EmitterProcessor<String> emitter){
        return emitter.publish().autoConnect();
    }

在我的 playground Reactive CQRS 存储库中,我玩过围绕 Axon Framework 应用程序包装 reactive API。

我想您可能会在这里找到您要找的东西 https://github.com/stefanvozd/Reactive-CQRS,但请注意,它仍在进行中并且缺少 comments/documentation...

根据您尝试使用此流执行的操作,这里有几个示例

  1. Example 发送命令并等待投影变为具体化的其余控制器,returns 以非阻塞方式向用户显示此视图。为此,您应该使用订阅查询
  2. Example 您可以作为 bean 注入并附加到事件流的 Reactive Event Bus 流
  3. Example 的反应式事件处理程序,使用反应式数据库驱动程序 r2dbc 保存投影,如果您使用 Axon 服务器,您将获得背压支持。

目前,这些都是实验性示例,如果您可以提供更多有关您正在努力完成的目标的信息,我可以为您提供一些指导