Spring 集成文件支持中的条件文件移动
Conditional File movement in Spring Integration File Support
我正在实施一个 Spring 集成工作流程,如下所示。
IntegrationFlows.from("inputFileProcessorChannel")
.split(fileSplitterSpec, spec -> {})
.transform(lineItemTransformer)
.handle(httpRequestExecutingMessageHandler)
.transform(reportDataAggregator)
.aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
.channel("reportGeneratorChannel")
.get();
现在,完成上述流程后,我需要将 input file
移动到存档目录。决定目标目录的决定是基于消息 header processingFailed
并且此 header 添加在流程的 .transform(reportDataAggregator)
步骤中。为了移动这个文件,我创建了另一个流程,如下面的代码
IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
.routeToRecipients(routerSpec -> {
routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
.recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
})
.get();
选择器方法
private MessageSelector createMessageSelector(Boolean ruleBoolean) {
return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}
下方报告渠道流量
IntegrationFlows.from("reportGeneratorChannel")
.transform(executionReportTransformer)
.handle(reportWritingMessageHandlerSpec)
.get();
但是,正如此流程所预期的那样,文件移动未完成,因为所述 header 未出现在流程执行中。
那么,如何实现在创建报告文件后执行 file mover flow
的目标?
FileSplitter
为我们填充这个 headers 每一行产生:
@Override
protected boolean willAddHeaders(Message<?> message) {
Object payload = message.getPayload();
return payload instanceof File || payload instanceof String;
}
@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
File file = null;
if (message.getPayload() instanceof File) {
file = (File) message.getPayload();
}
else if (message.getPayload() instanceof String) {
file = new File((String) message.getPayload());
}
if (file != null) {
if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
headers.put(FileHeaders.ORIGINAL_FILE, file);
}
if (!headers.containsKey(FileHeaders.FILENAME)) {
headers.put(FileHeaders.FILENAME, file.getName());
}
}
}
因此,即使您完成了聚合并准备向其中发送消息 .channel("reportGeneratorChannel")
,您仍然可以访问那些 file-related headers.
将此 reportGeneratorChannel
设为 PublishSubscribeChannel
并将 "file mover flow" 移到那里,就可以解决问题。
顺便说一句:到目前为止,您在同一频道上的 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
和第二个流会引导您进行 round-robin 调度。那不是 pub-sub 分布。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel
我正在实施一个 Spring 集成工作流程,如下所示。
IntegrationFlows.from("inputFileProcessorChannel")
.split(fileSplitterSpec, spec -> {})
.transform(lineItemTransformer)
.handle(httpRequestExecutingMessageHandler)
.transform(reportDataAggregator)
.aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
.channel("reportGeneratorChannel")
.get();
现在,完成上述流程后,我需要将 input file
移动到存档目录。决定目标目录的决定是基于消息 header processingFailed
并且此 header 添加在流程的 .transform(reportDataAggregator)
步骤中。为了移动这个文件,我创建了另一个流程,如下面的代码
IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
.routeToRecipients(routerSpec -> {
routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
.recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
})
.get();
选择器方法
private MessageSelector createMessageSelector(Boolean ruleBoolean) {
return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}
下方报告渠道流量
IntegrationFlows.from("reportGeneratorChannel")
.transform(executionReportTransformer)
.handle(reportWritingMessageHandlerSpec)
.get();
但是,正如此流程所预期的那样,文件移动未完成,因为所述 header 未出现在流程执行中。
那么,如何实现在创建报告文件后执行 file mover flow
的目标?
FileSplitter
为我们填充这个 headers 每一行产生:
@Override
protected boolean willAddHeaders(Message<?> message) {
Object payload = message.getPayload();
return payload instanceof File || payload instanceof String;
}
@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
File file = null;
if (message.getPayload() instanceof File) {
file = (File) message.getPayload();
}
else if (message.getPayload() instanceof String) {
file = new File((String) message.getPayload());
}
if (file != null) {
if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
headers.put(FileHeaders.ORIGINAL_FILE, file);
}
if (!headers.containsKey(FileHeaders.FILENAME)) {
headers.put(FileHeaders.FILENAME, file.getName());
}
}
}
因此,即使您完成了聚合并准备向其中发送消息 .channel("reportGeneratorChannel")
,您仍然可以访问那些 file-related headers.
将此 reportGeneratorChannel
设为 PublishSubscribeChannel
并将 "file mover flow" 移到那里,就可以解决问题。
顺便说一句:到目前为止,您在同一频道上的 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
和第二个流会引导您进行 round-robin 调度。那不是 pub-sub 分布。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel