Spring 具有本地数据流服务器的 Cloud Stream AggregateApplication
Spring Cloud Stream AggregateApplication with local Data Flow Server
我正在尝试创建 Spring Cloud Stream 聚合应用程序,该应用程序 运行 带有数据流 Web 服务器,以便能够通过 Web UI.
管理应用程序
申请运行纳class:
@SpringBootApplication
public class Runner {
public static void main(String[] args) {
new AggregateApplicationBuilder(args).web(true)
.from(JSONFileSourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorOne.class)
.to(LoggingSinkApplication.class).run(args);
}
这工作正常。现在尝试添加数据流服务器。创建一个 class:
@SpringBootApplication
@EnableDataFlowServer
public class WebServer {}
并将其设置为 AggregateApplicationBuilder 的父配置:
...
new AggregateApplicationBuilder(WebServer.class, args).web(true)
...
如果我运行它,出现以下异常:
BeanCreationException: Error creating bean with name 'initH2TCPServer' ...
Factory method 'initH2TCPServer' threw exception ... Exception opening port "19092" (port may be in use)
看起来 AggregateApplicationBuilder 进程试图创建另一个 H2 服务器,而不是使用来自父配置的服务器。
如果我在我的 JSONFileSourceApplication、ProcessorOne 和 LoggingSinkApplication classes 中将 @SpringBootApplication 注释替换为 @Configuration - 流应用程序启动,Web 服务器启动 (http://localhost:9393/dashboard),但我看不到我的流组件,网络中的所有选项卡 UI 都是空的。
如何运行 Spring Cloud Stream AggregateApplication with Web UI 启用?
今天的 SCDF 不支持聚合应用程序的概念。
主要原因是 SCDF 假设应用程序是已知的渠道类型;它是 input/output 或两者(对于处理器)。但是,在使用 AggregateApplicationBuilder
时,您可以通过多种方式组合频道,并且在 DSL/UI 中变得模糊,以便能够以自动方式发现和绑定到频道。
也就是说,在即将发布的版本中,
1) 我们计划引入"function-chaining"的概念。这允许在运行时将 "multiple" 小函数 (例如 filterNulls、transformToUppercase、splitByHypen、..) 组合到单个流应用程序中。作为一名开发人员,您将专注于 developing/testing 独立的功能并将它们注册到 SCDF。一旦在注册表中可用,您将拥有新的 DSL 原语以将它们组合成一个单元,该单元在内部由 SCDF 链接(在运行时)。
2) 我们计划提高 queues/topics 的知名度。将有 DSL 原语直接与它们交互和创建数据管道。鉴于这种灵活性,像用例这样的组合会更容易。
我正在尝试创建 Spring Cloud Stream 聚合应用程序,该应用程序 运行 带有数据流 Web 服务器,以便能够通过 Web UI.
管理应用程序申请运行纳class:
@SpringBootApplication
public class Runner {
public static void main(String[] args) {
new AggregateApplicationBuilder(args).web(true)
.from(JSONFileSourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorOne.class)
.to(LoggingSinkApplication.class).run(args);
}
这工作正常。现在尝试添加数据流服务器。创建一个 class:
@SpringBootApplication
@EnableDataFlowServer
public class WebServer {}
并将其设置为 AggregateApplicationBuilder 的父配置:
...
new AggregateApplicationBuilder(WebServer.class, args).web(true)
...
如果我运行它,出现以下异常:
BeanCreationException: Error creating bean with name 'initH2TCPServer' ...
Factory method 'initH2TCPServer' threw exception ... Exception opening port "19092" (port may be in use)
看起来 AggregateApplicationBuilder 进程试图创建另一个 H2 服务器,而不是使用来自父配置的服务器。
如果我在我的 JSONFileSourceApplication、ProcessorOne 和 LoggingSinkApplication classes 中将 @SpringBootApplication 注释替换为 @Configuration - 流应用程序启动,Web 服务器启动 (http://localhost:9393/dashboard),但我看不到我的流组件,网络中的所有选项卡 UI 都是空的。
如何运行 Spring Cloud Stream AggregateApplication with Web UI 启用?
今天的 SCDF 不支持聚合应用程序的概念。
主要原因是 SCDF 假设应用程序是已知的渠道类型;它是 input/output 或两者(对于处理器)。但是,在使用 AggregateApplicationBuilder
时,您可以通过多种方式组合频道,并且在 DSL/UI 中变得模糊,以便能够以自动方式发现和绑定到频道。
也就是说,在即将发布的版本中,
1) 我们计划引入"function-chaining"的概念。这允许在运行时将 "multiple" 小函数 (例如 filterNulls、transformToUppercase、splitByHypen、..) 组合到单个流应用程序中。作为一名开发人员,您将专注于 developing/testing 独立的功能并将它们注册到 SCDF。一旦在注册表中可用,您将拥有新的 DSL 原语以将它们组合成一个单元,该单元在内部由 SCDF 链接(在运行时)。
2) 我们计划提高 queues/topics 的知名度。将有 DSL 原语直接与它们交互和创建数据管道。鉴于这种灵活性,像用例这样的组合会更容易。