使用共享资源拆分文件并处理每个部分

Split File and Process each part while using a shared resource

我正在使用 spring 集成来轮询文件。此单个文件包含多个报告。我想将文件拆分为报告文件并另存为不同的文件。

<int-file:inbound-channel-adapter id="filesIn"
        directory="file:${fileInDirectory}" 
        filename-pattern="*.txt" 
        prevent-duplicates="true">
    <int:poller id="poller" fixed-delay="5000"/>
    </int-file:inbound-channel-adapter>

<int:service-activator input-channel="filesIn"
                                   output-channel="filesOut"
                                   ref="handler"/>

<int-file:outbound-channel-adapter id="filesOut"
                                   directory="file:${archiveDirectory}"
                                   delete-source-files="true"/>

handler内部,handler内部的处理方法如下。

public List<ReportContent> splitTextToReports(File file){ 
     // split the file
     // store the file content text to ReportContent object
     // add to a List of ReportContent
}

ReportContent 有以下字段

每个 ReportContent 都需要进行另一个处理。

以下方法将处理上述方法返回的列表中的每个元素。

public void processReportContent (ReportContent reportContent){
   // process report content and save the file in the relevant place
}

问题分为两部分。

  1. 如何在读取第一个主文件后立即使用拆分器接管。这样每个报表的处理都可以完成拆分对象的一部分。
  2. 查找报告路径的服务应该在所有拆分的对象之间使用一个公共的 HashMap。如果此哈希映射中存在基于报告类型的值,它将从此映射中检索。否则,应执行单独的 API 调用以使用报告类型检索报告路径。从此 API 调用收到的报告类型和值(报告)将存储在地图中。 Map 的重要性在于避免进行不必要的 API 调用。

1。 而不是 <int:service-activator input-channel="filesIn"... 我会添加一个链条

<int:chain id="processor" input-channel="filesIn" output-channel="filesOut">
    <int:splitter>
        <bean class="...your impl of org.springframework.integration.splitter.AbstractMessageSplitter..."/>
    </int:splitter>
</int:chain>

并将您的 splitTextToReports 逻辑移动到此拆分器实现中。因此,在拆分器之后的链中,您将拥有 ReportContent 个实例的平坦流。

2。 在拆分器之后的链中添加转换步骤。将您的 processReportContent 逻辑放在这里。转换结果:有效负载中包含您的报告的字符串,'filename' 消息头变量中的文件名。

API你的变压器可能是这样的

interface ReportContentTransformer {
   Message<?> transform(ReportContent content);
}

链条看起来像

<int:chain id="processor" input-channel="filesIn" output-channel="filesOut">
    <int:splitter>
       ...
    </int:splitter>
    <int:transformer ref="...ref on your ReportContentTransformer interface implementation bean..." method="transform"/>
</int:chain>

3。 添加到您的 outbound-channel-adapter 属性

filename-generator-expression="headers.get('filename')"

在文件存储时使用来自 filename 变量的文件名。

为了并行处理项目,<splitter> 总是有一个技巧,就像下游 ExecutorChannel,所以在拆分项目的迭代过程中,我们在发送前一个项目后立即移动到下一个项目。

此外,为了提高吞吐量,splitter 支持 Iterator 流式传输。

我打算为你的任务建议 FileSplitter,但我猜你不是按行分割,而是按其他标识符。也许你的内容只是 XML 或 JSON,这样可以很容易地确定部分内容。

从这里开始,为您的案例提供一些 Iterator 实施可能并不那么容易。

不过我觉得没关系。您已经有了拆分逻辑并构建了您的 List<ReportContent>.

关于 ConcurrentMap

如何查看 @Cacheable Spring 对您的 "hard" 服务的支持,当下一次调用相同的键时将只是 return 值来自缓存?

为此,您可以在 <int-file:outbound-channel-adapter>:

上使用 directory-expression
<int-file:outbound-channel-adapter directory-expression="@reportPathService.getPath(payload)" /> 

您也可以接受同样的文件名技术。

注意:文件名注意默认header:FileHeaders.FILENAME.