Spring 集成使用和方法验证
Spring Integration Usage and Approach Validation
我目前正在测试使用 Spring 集成将同一个 Spring-Boot 应用程序中的不同模块捆绑在一起,并将服务整合到一个从 single-entry 点开始的统一流程中。
如果可能,我正在寻找 Spring 集成的以下说明:
- 下面的代码是使用 DSL 构建流的正确方法吗?
- 在下面的 "C" 中,我可以将结果冒泡到 "B" 流吗?
- 使用 DSL 与 XML 哪个更好?
- 我对如何正确 "terminate" 流程感到困惑?
流程概述
在下面的代码中,我只是将页面发布到目的地。整体流程是这样的。
- 发布者流侦听负载并将其拆分为多个部分。
- 内容流过滤掉页面并将它们分成几个部分。
- AWS 流程订阅和处理部分。
- 文件流订阅和处理部分。
最终,发布者流程中可能会有其他非常不同类型的消费者,这些消费者不是内容,这就是我将发布者与内容分开的原因。
A) 发布流程 (publisher.jar):
这是我的 "main" 通过网关发起的流程。目的是将其用作开始触发所有发布流程的入口点。
- 收到消息
- 预处理消息并保存。
- 将有效载荷拆分为包含在其中的各个条目。
- 用其余数据丰富每个条目
- 将每个条目放入输出通道。
代码如下:
@Bean
IntegrationFlow flowPublish()
{
return f -> f
.channel(this.publishingInputChannel())
//Prepare the payload
.<Package>handle((p, h) -> this.save(p))
//Split the artifact resolved items
.split(Package.class, Package::getItems)
//Find the artifact associated to each item (if available)
.enrich(
e -> e.<PackageEntry>requestPayload(
m ->
{
final PackageEntry item = m.getPayload();
final Publishable publishable = this.findPublishable(item);
item.setPublishable(publishable);
return item;
}))
//Send the results to the output channel
.channel(this.publishingOutputChannel());
}
B) 内容流 (content.jar)
该模块的职责是处理传入的 "content" 有效负载(即本例中的页面)并将它们 split/route 发送给适当的订阅者。
- 在发布者输出频道上收听
- 仅按页面类型过滤条目
- 将原始有效载荷添加到 header 以备后用
- 将有效负载转换为实际类型
- 将页面拆分成各个元素(块)
- 将每个元素路由到适当的 PubSub 频道。
至少现在,订阅的流没有 return 任何响应 - 它们应该只是触发并忘记但是 我想知道如何在使用 pub-sub频道.
代码如下:
@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
return MessageChannels.publishSubscribe("assetPublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
return MessageChannels.publishSubscribe("pagePublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.publishingChannel)
//Filter for root pages (which contain elements)
.filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
//Put the publishable details in the header
.enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
//Transform the item to a Page
.transform(PackageEntry.class, PackageEntry::getPublishable)
//Split page into components and put the type in the header
.split(Page.class, this::splitPageElements)
//Route content based on type to the subscriber
.<PageContent, String>route(PageContent::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
.subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
.defaultOutputToParentFlow())
.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
C) AWS 内容 (aws-content.jar)
此模块是特定内容流的众多潜在订阅者之一。它根据发布到上面的路由通道单独处理每个元素。
- 订阅适当的频道。
- 适当地处理动作。
可以有多个订阅上述路由输出通道的流模块,这只是其中之一。
例如,"contentPageChannel" 可以调用下面的 flowPageToS3(在 aws 模块中)和 flowPageToFile(在另一个模块中)。
代码如下:
@Bean
IntegrationFlow flowAssetToS3()
{
return flow -> flow
.channel(this.assetChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<PageContent>handle((p, h) ->
{
return this.publishS3Asset(p);
})));
}
@Bean
IntegrationFlow flowPageToS3()
{
return flow -> flow
.channel(this.pageChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this.publishS3Page(p))
.enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
.handle(this.s3MessageHandler())));
}
首先你的问题内容很多:阅读时很难保留所有信息。那是你的项目,所以你应该对这个主题非常有信心。但对我们来说这是新事物,可能只是放弃阅读而不是已经尝试回答。
无论如何,我会在开始时尝试回答你的问题,尽管我觉得你会开始一个很长的讨论 "what?, how?, why?"...
Is the below code the right way to structure flows using the DSL?
这真的取决于你的逻辑。在逻辑组件之间区分它是个好主意,但在这件事上切断单独的 jar 可能会产生开销。查看您的代码,在我看来您仍然将所有内容收集到单个 Spring 引导应用程序中,并且只是 @Autowired
到 @Configuration
的适当通道。所以,是的,单独 @Configuration
是个好主意,但单独的 jar 是一种开销。恕我直言。
In "C" below, can i bubble up the result to the "B" flow?
好吧,既然故事是关于发布-订阅的,那么等待回复真的很不寻常。您将从这些订阅者那里得到多少回复?对,这就是问题所在——我们可以发送给许多订阅者,但我们无法从所有订阅者那里获得对单个 return 的回复。让我们回到 Java 代码:我们可以有多个方法参数,但我们只有一个 return
。这同样适用于消息传递。无论如何,您可以查看 Scatter-Gather 模式实现。
Is using the DSL vs. the XML the better approach?
两者都只是高级API。下面是相同的集成组件。查看您的应用程序,您会使用 XML 配置找到相同的分布式解决方案。没有理由退出 Java DSL。至少它不那么冗长,对你来说。
I am confused as to how to correctly "terminate" a flow?
您的详细描述绝对不清楚。如果您发送到 S3 或文件,那就是终止。这些组件没有回复,所以无处可去,无事可做。那就是停止。与 Java 方法和 void
相同。如果您担心您的入口点网关,那么只需 void
并且不要等待任何回复。有关详细信息,请参阅 Messaging Gateway。
我目前正在测试使用 Spring 集成将同一个 Spring-Boot 应用程序中的不同模块捆绑在一起,并将服务整合到一个从 single-entry 点开始的统一流程中。
如果可能,我正在寻找 Spring 集成的以下说明:
- 下面的代码是使用 DSL 构建流的正确方法吗?
- 在下面的 "C" 中,我可以将结果冒泡到 "B" 流吗?
- 使用 DSL 与 XML 哪个更好?
- 我对如何正确 "terminate" 流程感到困惑?
流程概述
在下面的代码中,我只是将页面发布到目的地。整体流程是这样的。
- 发布者流侦听负载并将其拆分为多个部分。
- 内容流过滤掉页面并将它们分成几个部分。
- AWS 流程订阅和处理部分。
- 文件流订阅和处理部分。
最终,发布者流程中可能会有其他非常不同类型的消费者,这些消费者不是内容,这就是我将发布者与内容分开的原因。
A) 发布流程 (publisher.jar):
这是我的 "main" 通过网关发起的流程。目的是将其用作开始触发所有发布流程的入口点。
- 收到消息
- 预处理消息并保存。
- 将有效载荷拆分为包含在其中的各个条目。
- 用其余数据丰富每个条目
- 将每个条目放入输出通道。
代码如下:
@Bean
IntegrationFlow flowPublish()
{
return f -> f
.channel(this.publishingInputChannel())
//Prepare the payload
.<Package>handle((p, h) -> this.save(p))
//Split the artifact resolved items
.split(Package.class, Package::getItems)
//Find the artifact associated to each item (if available)
.enrich(
e -> e.<PackageEntry>requestPayload(
m ->
{
final PackageEntry item = m.getPayload();
final Publishable publishable = this.findPublishable(item);
item.setPublishable(publishable);
return item;
}))
//Send the results to the output channel
.channel(this.publishingOutputChannel());
}
B) 内容流 (content.jar)
该模块的职责是处理传入的 "content" 有效负载(即本例中的页面)并将它们 split/route 发送给适当的订阅者。
- 在发布者输出频道上收听
- 仅按页面类型过滤条目
- 将原始有效载荷添加到 header 以备后用
- 将有效负载转换为实际类型
- 将页面拆分成各个元素(块)
- 将每个元素路由到适当的 PubSub 频道。
至少现在,订阅的流没有 return 任何响应 - 它们应该只是触发并忘记但是 我想知道如何在使用 pub-sub频道.
代码如下:
@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
return MessageChannels.publishSubscribe("assetPublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
return MessageChannels.publishSubscribe("pagePublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.publishingChannel)
//Filter for root pages (which contain elements)
.filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
//Put the publishable details in the header
.enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
//Transform the item to a Page
.transform(PackageEntry.class, PackageEntry::getPublishable)
//Split page into components and put the type in the header
.split(Page.class, this::splitPageElements)
//Route content based on type to the subscriber
.<PageContent, String>route(PageContent::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
.subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
.defaultOutputToParentFlow())
.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
C) AWS 内容 (aws-content.jar)
此模块是特定内容流的众多潜在订阅者之一。它根据发布到上面的路由通道单独处理每个元素。
- 订阅适当的频道。
- 适当地处理动作。
可以有多个订阅上述路由输出通道的流模块,这只是其中之一。
例如,"contentPageChannel" 可以调用下面的 flowPageToS3(在 aws 模块中)和 flowPageToFile(在另一个模块中)。
代码如下:
@Bean
IntegrationFlow flowAssetToS3()
{
return flow -> flow
.channel(this.assetChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<PageContent>handle((p, h) ->
{
return this.publishS3Asset(p);
})));
}
@Bean
IntegrationFlow flowPageToS3()
{
return flow -> flow
.channel(this.pageChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this.publishS3Page(p))
.enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
.handle(this.s3MessageHandler())));
}
首先你的问题内容很多:阅读时很难保留所有信息。那是你的项目,所以你应该对这个主题非常有信心。但对我们来说这是新事物,可能只是放弃阅读而不是已经尝试回答。
无论如何,我会在开始时尝试回答你的问题,尽管我觉得你会开始一个很长的讨论 "what?, how?, why?"...
Is the below code the right way to structure flows using the DSL?
这真的取决于你的逻辑。在逻辑组件之间区分它是个好主意,但在这件事上切断单独的 jar 可能会产生开销。查看您的代码,在我看来您仍然将所有内容收集到单个 Spring 引导应用程序中,并且只是 @Autowired
到 @Configuration
的适当通道。所以,是的,单独 @Configuration
是个好主意,但单独的 jar 是一种开销。恕我直言。
In "C" below, can i bubble up the result to the "B" flow?
好吧,既然故事是关于发布-订阅的,那么等待回复真的很不寻常。您将从这些订阅者那里得到多少回复?对,这就是问题所在——我们可以发送给许多订阅者,但我们无法从所有订阅者那里获得对单个 return 的回复。让我们回到 Java 代码:我们可以有多个方法参数,但我们只有一个 return
。这同样适用于消息传递。无论如何,您可以查看 Scatter-Gather 模式实现。
Is using the DSL vs. the XML the better approach?
两者都只是高级API。下面是相同的集成组件。查看您的应用程序,您会使用 XML 配置找到相同的分布式解决方案。没有理由退出 Java DSL。至少它不那么冗长,对你来说。
I am confused as to how to correctly "terminate" a flow?
您的详细描述绝对不清楚。如果您发送到 S3 或文件,那就是终止。这些组件没有回复,所以无处可去,无事可做。那就是停止。与 Java 方法和 void
相同。如果您担心您的入口点网关,那么只需 void
并且不要等待任何回复。有关详细信息,请参阅 Messaging Gateway。