Spring 集成使用和方法验证

Spring Integration Usage and Approach Validation

我目前正在测试使用 Spring 集成将同一个 Spring-Boot 应用程序中的不同模块捆绑在一起,并将服务整合到一个从 single-entry 点开始的统一流程中。

如果可能,我正在寻找 Spring 集成的以下说明:

  1. 下面的代码是使用 DSL 构建流的正确方法吗?
  2. 在下面的 "C" 中,我可以将结果冒泡到 "B" 流吗?
  3. 使用 DSL 与 XML 哪个更好?
  4. 我对如何正确 "terminate" 流程感到困惑?

流程概述

在下面的代码中,我只是将页面发布到目的地。整体流程是这样的。

  1. 发布者流侦听负载并将其拆分为多个部分。
  2. 内容流过滤掉页面并将它们分成几个部分。
    1. AWS 流程订阅和处理部分。
    2. 文件流订阅和处理部分。

最终,发布者流程中可能会有其他非常不同类型的消费者,这些消费者不是内容,这就是我将发布者与内容分开的原因。

A) 发布流程 (publisher.jar):

这是我的 "main" 通过网关发起的流程。目的是将其用作开始触发所有发布流程的入口点。

  1. 收到消息
  2. 预处理消息并保存。
  3. 将有效载荷拆分为包含在其中的各个条目。
  4. 用其余数据丰富每个条目
  5. 将每个条目放入输出通道。

代码如下:

@Bean
IntegrationFlow flowPublish()
{
    return f -> f
        .channel(this.publishingInputChannel())
        //Prepare the payload
        .<Package>handle((p, h) -> this.save(p))
        //Split the artifact resolved items
        .split(Package.class, Package::getItems)
        //Find the artifact associated to each item (if available)
        .enrich(
            e -> e.<PackageEntry>requestPayload(
                m ->
                {
                    final PackageEntry item = m.getPayload();
                    final Publishable publishable = this.findPublishable(item);
                    item.setPublishable(publishable);
                    return item;
                }))
        //Send the results to the output channel
        .channel(this.publishingOutputChannel());
}

B) 内容流 (content.jar)

该模块的职责是处理传入的 "content" 有效负载(即本例中的页面)并将它们 split/route 发送给适当的订阅者。

  1. 在发布者输出频道上收听
  2. 仅按页面类型过滤条目
  3. 将原始有效载荷添加到 header 以备后用
  4. 将有效负载转换为实际类型
  5. 将页面拆分成各个元素(块)
  6. 将每个元素路由到适当的 PubSub 频道。

至少现在,订阅的流没有 return 任何响应 - 它们应该只是触发并忘记但是 我想知道如何在使用 pub-sub频道.

代码如下:

@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
    return MessageChannels.publishSubscribe("assetPublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
    return MessageChannels.publishSubscribe("pagePublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
IntegrationFlow flowPublishContent()
{
    return flow -> flow
        .channel(this.publishingChannel)
        //Filter for root pages (which contain elements)
        .filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
        //Put the publishable details in the header
        .enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
        //Transform the item to a Page
        .transform(PackageEntry.class, PackageEntry::getPublishable)
        //Split page into components and put the type in the header
        .split(Page.class, this::splitPageElements)
        //Route content based on type to the subscriber
        .<PageContent, String>route(PageContent::getType, mapping -> mapping
            .resolutionRequired(false)
            .subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
            .subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
            .defaultOutputToParentFlow())
        .channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}

C) AWS 内容 (aws-content.jar)

此模块是特定内容流的众多潜在订阅者之一。它根据发布到上面的路由通道单独处理每个元素。

  1. 订阅适当的频道。
  2. 适当地处理动作。

可以有多个订阅上述路由输出通道的流模块,这只是其中之一。

例如,"contentPageChannel" 可以调用下面的 flowPageToS3(在 aws 模块中)和 flowPageToFile(在另一个模块中)。

代码如下:

@Bean
IntegrationFlow flowAssetToS3()
{
    return flow -> flow
        .channel(this.assetChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<PageContent>handle((p, h) ->
                                     {
                                         return this.publishS3Asset(p);
                                     })));
}

@Bean
IntegrationFlow flowPageToS3()
{
    return flow -> flow
        .channel(this.pageChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<Page>handle((p, h) -> this.publishS3Page(p))
                .enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
                .handle(this.s3MessageHandler())));
}

首先你的问题内容很多:阅读时很难保留所有信息。那是你的项目,所以你应该对这个主题非常有信心。但对我们来说这是新事物,可能只是放弃阅读而不是已经尝试回答。

无论如何,我会在开始时尝试回答你的问题,尽管我觉得你会开始一个很长的讨论 "what?, how?, why?"...

Is the below code the right way to structure flows using the DSL?

这真的取决于你的逻辑。在逻辑组件之间区分它是个好主意,但在这件事上切断单独的 jar 可能会产生开销。查看您的代码,在我看来您仍然将所有内容收集到单个 Spring 引导应用程序中,并且只是 @Autowired@Configuration 的适当通道。所以,是的,单独 @Configuration 是个好主意,但单独的 jar 是一种开销。恕我直言。

In "C" below, can i bubble up the result to the "B" flow?

好吧,既然故事是关于发布-订阅的,那么等待回复真的很不寻常。您将从这些订阅者那里得到多少回复?对,这就是问题所在——我们可以发送给许多订阅者,但我们无法从所有订阅者那里获得对单个 return 的回复。让我们回到 Java 代码:我们可以有多个方法参数,但我们只有一个 return。这同样适用于消息传递。无论如何,您可以查看 Scatter-Gather 模式实现。

Is using the DSL vs. the XML the better approach?

两者都只是高级API。下面是相同的集成组件。查看您的应用程序,您会使用 XML 配置找到相同的分布式解决方案。没有理由退出 Java DSL。至少它不那么冗长,对你来说。

I am confused as to how to correctly "terminate" a flow?

您的详细描述绝对不清楚。如果您发送到 S3 或文件,那就是终止。这些组件没有回复,所以无处可去,无事可做。那就是停止。与 Java 方法和 void 相同。如果您担心您的入口点网关,那么只需 void 并且不要等待任何回复。有关详细信息,请参阅 Messaging Gateway