Spring 具有本地数据流服务器的 Cloud Stream AggregateApplication

Spring Cloud Stream AggregateApplication with local Data Flow Server

我正在尝试创建 Spring Cloud Stream 聚合应用程序,该应用程序 运行 带有数据流 Web 服务器,以便能够通过 Web UI.

管理应用程序

申请运行纳class:

@SpringBootApplication
public class Runner {

public static void main(String[] args) {
    new AggregateApplicationBuilder(args).web(true)
            .from(JSONFileSourceApplication.class).args("--fixedDelay=5000")
            .via(ProcessorOne.class)
            .to(LoggingSinkApplication.class).run(args);
}

这工作正常。现在尝试添加数据流服务器。创建一个 class:

@SpringBootApplication
@EnableDataFlowServer
public class WebServer {}

并将其设置为 AggregateApplicationBuilder 的父配置:

...
    new AggregateApplicationBuilder(WebServer.class, args).web(true)
...

如果我运行它,出现以下异常:

BeanCreationException: Error creating bean with name 'initH2TCPServer' ... 
Factory method 'initH2TCPServer' threw exception ... Exception opening port "19092" (port may be in use)

看起来 AggregateApplicationBuilder 进程试图创建另一个 H2 服务器,而不是使用来自父配置的服务器。

如果我在我的 JSONFileSourceApplication、ProcessorOne 和 LoggingSinkApplication classes 中将 @SpringBootApplication 注释替换为 @Configuration - 流应用程序启动,Web 服务器启动 (http://localhost:9393/dashboard),但我看不到我的流组件,网络中的所有选项卡 UI 都是空的。

如何运行 Spring Cloud Stream AggregateApplication with Web UI 启用?

今天的 SCDF 不支持聚合应用程序的概念。

主要原因是 SCDF 假设应用程序是已知的渠道类型;它是 input/output 或两者(对于处理器)。但是,在使用 AggregateApplicationBuilder 时,您可以通过多种方式组合频道,并且在 DSL/UI 中变得模糊,以便能够以自动方式发现和绑定到频道。

也就是说,在即将发布的版本中,

1) 我们计划引入"function-chaining"的概念。这允许在运行时将 "multiple" 小函数 (例如 filterNulls、transformToUppercase、splitByHypen、..) 组合到单个流应用程序中。作为一名开发人员,您将专注于 developing/testing 独立的功能并将它们注册到 SCDF。一旦在注册表中可用,您将拥有新的 DSL 原语以将它们组合成一个单元,该单元在内部由 SCDF 链接(在运行时)。

2) 我们计划提高 queues/topics 的知名度。将有 DSL 原语直接与它们交互和创建数据管道。鉴于这种灵活性,像用例这样的组合会更容易。