使用共享资源拆分文件并处理每个部分
Split File and Process each part while using a shared resource
我正在使用 spring 集成来轮询文件。此单个文件包含多个报告。我想将文件拆分为报告文件并另存为不同的文件。
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${fileInDirectory}"
filename-pattern="*.txt"
prevent-duplicates="true">
<int:poller id="poller" fixed-delay="5000"/>
</int-file:inbound-channel-adapter>
<int:service-activator input-channel="filesIn"
output-channel="filesOut"
ref="handler"/>
<int-file:outbound-channel-adapter id="filesOut"
directory="file:${archiveDirectory}"
delete-source-files="true"/>
handler内部,handler内部的处理方法如下。
public List<ReportContent> splitTextToReports(File file){
// split the file
// store the file content text to ReportContent object
// add to a List of ReportContent
}
ReportContent 有以下字段
reportData(将保存在新文件中的文本)
报告类型
报告日期
每个 ReportContent 都需要进行另一个处理。
- 根据报表类型查找报表保存路径。这是通过服务调用完成的。
- 将报告数据保存在 table
以下方法将处理上述方法返回的列表中的每个元素。
public void processReportContent (ReportContent reportContent){
// process report content and save the file in the relevant place
}
问题分为两部分。
- 如何在读取第一个主文件后立即使用拆分器接管。这样每个报表的处理都可以完成拆分对象的一部分。
- 查找报告路径的服务应该在所有拆分的对象之间使用一个公共的 HashMap。如果此哈希映射中存在基于报告类型的值,它将从此映射中检索。否则,应执行单独的 API 调用以使用报告类型检索报告路径。从此 API 调用收到的报告类型和值(报告)将存储在地图中。 Map 的重要性在于避免进行不必要的 API 调用。
1。
而不是 <int:service-activator input-channel="filesIn"...
我会添加一个链条
<int:chain id="processor" input-channel="filesIn" output-channel="filesOut">
<int:splitter>
<bean class="...your impl of org.springframework.integration.splitter.AbstractMessageSplitter..."/>
</int:splitter>
</int:chain>
并将您的 splitTextToReports
逻辑移动到此拆分器实现中。因此,在拆分器之后的链中,您将拥有 ReportContent
个实例的平坦流。
2。
在拆分器之后的链中添加转换步骤。将您的 processReportContent
逻辑放在这里。转换结果:有效负载中包含您的报告的字符串,'filename' 消息头变量中的文件名。
API你的变压器可能是这样的
interface ReportContentTransformer {
Message<?> transform(ReportContent content);
}
链条看起来像
<int:chain id="processor" input-channel="filesIn" output-channel="filesOut">
<int:splitter>
...
</int:splitter>
<int:transformer ref="...ref on your ReportContentTransformer interface implementation bean..." method="transform"/>
</int:chain>
3。
添加到您的 outbound-channel-adapter
属性
filename-generator-expression="headers.get('filename')"
在文件存储时使用来自 filename
变量的文件名。
为了并行处理项目,<splitter>
总是有一个技巧,就像下游 ExecutorChannel
,所以在拆分项目的迭代过程中,我们在发送前一个项目后立即移动到下一个项目。
此外,为了提高吞吐量,splitter
支持 Iterator
流式传输。
我打算为你的任务建议 FileSplitter
,但我猜你不是按行分割,而是按其他标识符。也许你的内容只是 XML 或 JSON,这样可以很容易地确定部分内容。
从这里开始,为您的案例提供一些 Iterator
实施可能并不那么容易。
不过我觉得没关系。您已经有了拆分逻辑并构建了您的 List<ReportContent>
.
关于 ConcurrentMap
。
如何查看 @Cacheable
Spring 对您的 "hard" 服务的支持,当下一次调用相同的键时将只是 return 值来自缓存?
为此,您可以在 <int-file:outbound-channel-adapter>
:
上使用 directory-expression
<int-file:outbound-channel-adapter directory-expression="@reportPathService.getPath(payload)" />
您也可以接受同样的文件名技术。
注意:文件名注意默认header:FileHeaders.FILENAME
.
我正在使用 spring 集成来轮询文件。此单个文件包含多个报告。我想将文件拆分为报告文件并另存为不同的文件。
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${fileInDirectory}"
filename-pattern="*.txt"
prevent-duplicates="true">
<int:poller id="poller" fixed-delay="5000"/>
</int-file:inbound-channel-adapter>
<int:service-activator input-channel="filesIn"
output-channel="filesOut"
ref="handler"/>
<int-file:outbound-channel-adapter id="filesOut"
directory="file:${archiveDirectory}"
delete-source-files="true"/>
handler内部,handler内部的处理方法如下。
public List<ReportContent> splitTextToReports(File file){
// split the file
// store the file content text to ReportContent object
// add to a List of ReportContent
}
ReportContent 有以下字段
reportData(将保存在新文件中的文本)
报告类型
报告日期
每个 ReportContent 都需要进行另一个处理。
- 根据报表类型查找报表保存路径。这是通过服务调用完成的。
- 将报告数据保存在 table
以下方法将处理上述方法返回的列表中的每个元素。
public void processReportContent (ReportContent reportContent){
// process report content and save the file in the relevant place
}
问题分为两部分。
- 如何在读取第一个主文件后立即使用拆分器接管。这样每个报表的处理都可以完成拆分对象的一部分。
- 查找报告路径的服务应该在所有拆分的对象之间使用一个公共的 HashMap。如果此哈希映射中存在基于报告类型的值,它将从此映射中检索。否则,应执行单独的 API 调用以使用报告类型检索报告路径。从此 API 调用收到的报告类型和值(报告)将存储在地图中。 Map 的重要性在于避免进行不必要的 API 调用。
1。
而不是 <int:service-activator input-channel="filesIn"...
我会添加一个链条
<int:chain id="processor" input-channel="filesIn" output-channel="filesOut">
<int:splitter>
<bean class="...your impl of org.springframework.integration.splitter.AbstractMessageSplitter..."/>
</int:splitter>
</int:chain>
并将您的 splitTextToReports
逻辑移动到此拆分器实现中。因此,在拆分器之后的链中,您将拥有 ReportContent
个实例的平坦流。
2。
在拆分器之后的链中添加转换步骤。将您的 processReportContent
逻辑放在这里。转换结果:有效负载中包含您的报告的字符串,'filename' 消息头变量中的文件名。
API你的变压器可能是这样的
interface ReportContentTransformer {
Message<?> transform(ReportContent content);
}
链条看起来像
<int:chain id="processor" input-channel="filesIn" output-channel="filesOut">
<int:splitter>
...
</int:splitter>
<int:transformer ref="...ref on your ReportContentTransformer interface implementation bean..." method="transform"/>
</int:chain>
3。
添加到您的 outbound-channel-adapter
属性
filename-generator-expression="headers.get('filename')"
在文件存储时使用来自 filename
变量的文件名。
为了并行处理项目,<splitter>
总是有一个技巧,就像下游 ExecutorChannel
,所以在拆分项目的迭代过程中,我们在发送前一个项目后立即移动到下一个项目。
此外,为了提高吞吐量,splitter
支持 Iterator
流式传输。
我打算为你的任务建议 FileSplitter
,但我猜你不是按行分割,而是按其他标识符。也许你的内容只是 XML 或 JSON,这样可以很容易地确定部分内容。
从这里开始,为您的案例提供一些 Iterator
实施可能并不那么容易。
不过我觉得没关系。您已经有了拆分逻辑并构建了您的 List<ReportContent>
.
关于 ConcurrentMap
。
如何查看 @Cacheable
Spring 对您的 "hard" 服务的支持,当下一次调用相同的键时将只是 return 值来自缓存?
为此,您可以在 <int-file:outbound-channel-adapter>
:
directory-expression
<int-file:outbound-channel-adapter directory-expression="@reportPathService.getPath(payload)" />
您也可以接受同样的文件名技术。
注意:文件名注意默认header:FileHeaders.FILENAME
.