Spring 集成文件支持中的条件文件移动

Conditional File movement in Spring Integration File Support

我正在实施一个 Spring 集成工作流程,如下所示。

IntegrationFlows.from("inputFileProcessorChannel")
                           .split(fileSplitterSpec, spec -> {})
                           .transform(lineItemTransformer)
                           .handle(httpRequestExecutingMessageHandler)
                           .transform(reportDataAggregator)
                           .aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
                           .channel("reportGeneratorChannel")
                           .get();

现在,完成上述流程后,我需要将 input file 移动到存档目录。决定目标目录的决定是基于消息 header processingFailed 并且此 header 添加在流程的 .transform(reportDataAggregator) 步骤中。为了移动这个文件,我创建了另一个流程,如下面的代码

 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
                           .routeToRecipients(routerSpec -> {
                               routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
                                         .recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
                           })

                           .get();  

选择器方法

 private MessageSelector createMessageSelector(Boolean ruleBoolean) {
    return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}

下方报告渠道流量

IntegrationFlows.from("reportGeneratorChannel")
                           .transform(executionReportTransformer)
                           .handle(reportWritingMessageHandlerSpec)
                           .get();

但是,正如此流程所预期的那样,文件移动未完成,因为所述 header 未出现在流程执行中。

那么,如何实现在创建报告文件后执行 file mover flow 的目标?

FileSplitter 为我们填充这个 headers 每一行产生:

@Override
protected boolean willAddHeaders(Message<?> message) {
    Object payload = message.getPayload();
    return payload instanceof File || payload instanceof String;
}

@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
    File file = null;
    if (message.getPayload() instanceof File) {
        file = (File) message.getPayload();
    }
    else if (message.getPayload() instanceof String) {
        file = new File((String) message.getPayload());
    }
    if (file != null) {
        if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
            headers.put(FileHeaders.ORIGINAL_FILE, file);
        }
        if (!headers.containsKey(FileHeaders.FILENAME)) {
            headers.put(FileHeaders.FILENAME, file.getName());
        }
    }
}

因此,即使您完成了聚合并准备向其中发送消息 .channel("reportGeneratorChannel"),您仍然可以访问那些 file-related headers.

将此 reportGeneratorChannel 设为 PublishSubscribeChannel 并将 "file mover flow" 移到那里,就可以解决问题。

顺便说一句:到目前为止,您在同一频道上的 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel")) 和第二个流会引导您进行 round-robin 调度。那不是 pub-sub 分布。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel