Spring 集成邮件:在所有数据库插入后发送电子邮件

Spring Integration Mail: Send email after all database inserts

你好,我有一个集成流程,它逐行拆分文件,将每一行转换为 POJO,然后通过 JDBC 出站网关将该 POJO 插入数据库。

我希望能够在文件处理完成后发送一封电子邮件。 我目前在我的 jdbcOutboundGateway 之后发送到 smtpFlow 通道,但是这是在每次插入数据库后发送一封电子邮件。

这是我当前的流程 DSL

IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .split(splitFile())
            .transform(this::transformToIndividualScore)
            .handle(jdbcOutboundGateway(null))
            .channel("smtpFlow")
            .get();

如何让此流程在 jdbcOutboundGateway 中处理完所有文件后仅发送一封电子邮件?

这是我的splitFile()方法

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(true, false);
    fs.setFirstLineAsHeader("IndividualScore");
    return fs;

这是我的transformToIndividualScore方法

@Transformer
private IndividualScore transformToIndividualScore(String payload) {
    String[] values = payload.split(",");
    IndividualScore is = new IndividualScore();
    is.setScorecardDate(values[0]);
    is.setVnSpId(values[1]);
    is.setPrimaryCat(values[2]);
    is.setSecondaryCat(values[3]);
    is.setScore(Integer.parseInt(values[4]));
    is.setActual(values[5]);
    return is;
}

在 assemble 句柄后添加 .aggregate() 拆分结果返回到单个消息。

所以我的问题的解决方案,(有点)。

在我的 FileSplitter 上将迭代器标记为 false 现在允许排序 headers。

更新后的splitFile()低于

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(false, false);
    fs.setFirstLineAsHeader("IndividualScore");
    fs.setApplySequence(true);
    return fs;
}

我的直觉告诉我,默认的.aggregate()发布策略一定是消息header sequenceSize ==消息聚合列表。

创建 FileSplitteriterator 设置为 true 时,sequenceSize 设置为 0,这将永远不会实现默认的发布策略.aggregate()

然而,这使得 FileSplitter 使用 List 将文件的所有行存储在内存中。聚合器还在内存中存储另一个 ArrayList 行..

是否有更好的解决方案来创建自定义聚合器来处理 END FileMarker 以允许使用迭代器来拆分文件?

在@ArtemBilan 的帮助下

我能够按顺序使用 publishSubscribeChannel() 和链 2 subscribe() 方法是新的 IntegrationFlow

 @Bean
IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .publishSubscribeChannel(channel -> channel
                    .subscribe(
                        a -> a
                                .split(splitFile())
                                .transform(this::transformToIndividualScore)
                                .handle(jdbcMessageHandler(null)))
                    .subscribe(
                        b -> b
                                .transform(this::transformToSuccessEmail)
                                .handle(emailHandler()))
            )
            .get();