如何使用 UI 中的指定值控制 Flux 流式传输间隔

How to control Flux streaming interval with a specified value from UI

我按以下方式定义了反应性终点(示例):

    @CrossOrigin(origins = "*")
    @ApiOperation(value = "Streams AMQ related monitor data.")
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> streamCustomerData() {
        return monitorService.createFluxForInterval("customers",
                gridService::fetchAllCustomers);
    }

    public Flux<ServerSentEvent> createFluxForInterval(ProcessName processName,
                Supplier method) {
            return Flux.interval(Duration.ZERO, Duration.ofSeconds(intervalPeriod))
                    .map(serverSentEvent -> ServerSentEvent.builder(method.get()).event(processName.description()).build());
        }
        

我已经在 yml 中配置了 intervalPeriod,但我想使用下拉菜单从 UI 中控制它。任何人都可以请建议 angular 代码如下

new Observable<any[]>(
  obs => {
    let eventSource = new EventSource("/customers");
    eventSource.addEventListener("customers", function (e: MessageEvent) {
      obs.next(JSON.parse(e.data));
    });
  });

从您分享的代码来看,您的 intervalPeriod 当前是一个 属性,它是在 Spring ApplicationContext 初始化时从您的应用程序 yaml 加载的。

现在,由于您需要 UI 的 user/admin 提供间隔时间,它需要作为此 [= 的路径参数或查询参数传递21=] 或其他一些 API(如果你想 re-use 在其他地方的这个间隔时间,只需存储该信息 in-memory/cache/DB)。

    @CrossOrigin(origins = "*")
    @ApiOperation(value = "Streams AMQ related monitor data.")
    @GetMapping(path= "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> streamCustomerData(@RequestParam("intervalPeriod") long intervalPeriod) {
        return monitorService.createFluxForInterval("customers",intervalPeriod,
                gridService::fetchAllCustomers);
    }

    public Flux<ServerSentEvent> createFluxForInterval(ProcessName processName, long intervalPeriod,
                Supplier method) {
            return Flux.interval(Duration.ZERO, Duration.ofSeconds(intervalPeriod))
                    .map(serverSentEvent -> ServerSentEvent.builder(method.get()).event(processName.description()).build());
        }

我已将 intervalPeriod 作为路径参数传递,并且还关闭了现有连接 (eventSource.close()) 并在 intervalPeriod 发生变化时打开一个新连接,因为浏览器最多可以有 6 个连接。