Spring 集成邮件:在所有数据库插入后发送电子邮件
Spring Integration Mail: Send email after all database inserts
你好,我有一个集成流程,它逐行拆分文件,将每一行转换为 POJO,然后通过 JDBC 出站网关将该 POJO 插入数据库。
我希望能够在文件处理完成后发送一封电子邮件。
我目前在我的 jdbcOutboundGateway 之后发送到 smtpFlow 通道,但是这是在每次插入数据库后发送一封电子邮件。
这是我当前的流程 DSL
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcOutboundGateway(null))
.channel("smtpFlow")
.get();
如何让此流程在 jdbcOutboundGateway
中处理完所有文件后仅发送一封电子邮件?
这是我的splitFile()
方法
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(true, false);
fs.setFirstLineAsHeader("IndividualScore");
return fs;
这是我的transformToIndividualScore
方法
@Transformer
private IndividualScore transformToIndividualScore(String payload) {
String[] values = payload.split(",");
IndividualScore is = new IndividualScore();
is.setScorecardDate(values[0]);
is.setVnSpId(values[1]);
is.setPrimaryCat(values[2]);
is.setSecondaryCat(values[3]);
is.setScore(Integer.parseInt(values[4]));
is.setActual(values[5]);
return is;
}
在 assemble 句柄后添加 .aggregate()
拆分结果返回到单个消息。
所以我的问题的解决方案,(有点)。
在我的 FileSplitter
上将迭代器标记为 false
现在允许排序 headers。
更新后的splitFile()
低于
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(false, false);
fs.setFirstLineAsHeader("IndividualScore");
fs.setApplySequence(true);
return fs;
}
我的直觉告诉我,默认的.aggregate()
发布策略一定是消息header sequenceSize
==消息聚合列表。
创建 FileSplitter
且 iterator
设置为 true
时,sequenceSize
设置为 0
,这将永远不会实现默认的发布策略.aggregate()
然而,这使得 FileSplitter
使用 List
将文件的所有行存储在内存中。聚合器还在内存中存储另一个 ArrayList
行..
是否有更好的解决方案来创建自定义聚合器来处理 END
FileMarker
以允许使用迭代器来拆分文件?
在@ArtemBilan 的帮助下
我能够按顺序使用 publishSubscribeChannel()
和链 2 subscribe()
方法是新的 IntegrationFlow
@Bean
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.publishSubscribeChannel(channel -> channel
.subscribe(
a -> a
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcMessageHandler(null)))
.subscribe(
b -> b
.transform(this::transformToSuccessEmail)
.handle(emailHandler()))
)
.get();
你好,我有一个集成流程,它逐行拆分文件,将每一行转换为 POJO,然后通过 JDBC 出站网关将该 POJO 插入数据库。
我希望能够在文件处理完成后发送一封电子邮件。 我目前在我的 jdbcOutboundGateway 之后发送到 smtpFlow 通道,但是这是在每次插入数据库后发送一封电子邮件。
这是我当前的流程 DSL
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcOutboundGateway(null))
.channel("smtpFlow")
.get();
如何让此流程在 jdbcOutboundGateway
中处理完所有文件后仅发送一封电子邮件?
这是我的splitFile()
方法
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(true, false);
fs.setFirstLineAsHeader("IndividualScore");
return fs;
这是我的transformToIndividualScore
方法
@Transformer
private IndividualScore transformToIndividualScore(String payload) {
String[] values = payload.split(",");
IndividualScore is = new IndividualScore();
is.setScorecardDate(values[0]);
is.setVnSpId(values[1]);
is.setPrimaryCat(values[2]);
is.setSecondaryCat(values[3]);
is.setScore(Integer.parseInt(values[4]));
is.setActual(values[5]);
return is;
}
在 assemble 句柄后添加 .aggregate()
拆分结果返回到单个消息。
所以我的问题的解决方案,(有点)。
在我的 FileSplitter
上将迭代器标记为 false
现在允许排序 headers。
更新后的splitFile()
低于
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(false, false);
fs.setFirstLineAsHeader("IndividualScore");
fs.setApplySequence(true);
return fs;
}
我的直觉告诉我,默认的.aggregate()
发布策略一定是消息header sequenceSize
==消息聚合列表。
创建 FileSplitter
且 iterator
设置为 true
时,sequenceSize
设置为 0
,这将永远不会实现默认的发布策略.aggregate()
然而,这使得 FileSplitter
使用 List
将文件的所有行存储在内存中。聚合器还在内存中存储另一个 ArrayList
行..
是否有更好的解决方案来创建自定义聚合器来处理 END
FileMarker
以允许使用迭代器来拆分文件?
在@ArtemBilan 的帮助下
我能够按顺序使用 publishSubscribeChannel()
和链 2 subscribe()
方法是新的 IntegrationFlow
@Bean
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.publishSubscribeChannel(channel -> channel
.subscribe(
a -> a
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcMessageHandler(null)))
.subscribe(
b -> b
.transform(this::transformToSuccessEmail)
.handle(emailHandler()))
)
.get();