使用 Mule 和集合聚合器对不完整的数据集进行数据处理

Holding data processing for incomplete data sets with Mule and a collection-aggregator

我需要收集和处理由另一个组织生成的文件集。为简单起见,假设该集合由两个文件组成,一个摘要文件和一个详细文件,名称如下:SUM20150701.dat 和 DTL20150701.dat,这将构成日期 20150701 的集合。问题是,集合需要按顺序处理,并且从外部组织传输文件可能容易出错,例如文件可能丢失。如果发生这种情况,应该保留这组文件,以及找到的任何后续文件集。例如,在 mule 进程开始时,源文件夹中可能包含:SUM20150701.dat、SUM20150703.dat、DTL20150703.dat。也就是说,20150701 的数据集是不完整的,而 20150703 是完整的。我需要让两个数据集保持到 DTL20150701.dat 到达,然后按顺序处理它们。

在我的 mule 进程的这种简化形式中,监视源文件夹中的文件。找到后,它们将移至存档文件夹并使用日期作为序列和相关值传递给集合聚合器。一组完成后,它会移动到目标文件夹。收集器使用较长的超时时间以确保不处理不完整的集:

<file:connector name="File" autoDelete="false" streaming="false" validateConnections="true" doc:name="File">
    <file:expression-filename-parser />
</file:connector>

<file:connector name="File1" autoDelete="false" outputAppend="true" streaming="false" validateConnections="true" doc:name="File" />

<vm:connector name="VM" validateConnections="true" doc:name="VM">
    <receiver-threading-profile maxThreadsActive="1"></receiver-threading-profile>
</vm:connector>

<flow name="fileaggreFlow2" doc:name="fileaggreFlow2">
    <file:inbound-endpoint path="G:\SourceDir" moveToDirectory="g:\SourceDir\Archive" connector-ref="File1" doc:name="get-working-files"                            
             responseTimeout="10000" pollingFrequency="5000" fileAge="600000" >
        <file:filename-regex-filter pattern="DTL(.*).dat|SUM(.*).dat" caseSensitive="false"/>
    </file:inbound-endpoint>

    <message-properties-transformer overwrite="true" doc:name="Message Properties">
        <add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]"/>
        <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
        <add-message-property key="MULE_CORRELATION_SEQUENCE" value="#[message.inboundProperties.originalFilename.substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]"/>       
    </message-properties-transformer>

    <vm:outbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
</flow>


<flow name="fileaggreFlow1" doc:name="fileaggreFlow1" processingStrategy="synchronous">
    <vm:inbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>

    <processor-chain doc:name="Processor Chain">
        <collection-aggregator timeout="1000000" failOnTimeout="true" doc:name="Collection Aggregator"/>

        <foreach doc:name="For Each">
            <file:outbound-endpoint path="G:\DestDir1" outputPattern="#[function:datestamp:yyyyMMdd.HHmmss].#[message.inboundProperties.originalFilename]" responseTimeout="10000" connector-ref="File1" doc:name="Destination"/>
        </foreach>
    </processor-chain>

如果所有集合都完整,这会正确处理按顺序找到的集合。它正确地等待不完整的集被填充,但不保留后续集,即在上面的示例中集 20150703 将处理,而 20150701 仍在等待 DTL 文件。

是否有设置或其他构造在存在未完成的较早集合时强制集合聚合器元素等待?

我将文件名的日期部分用于相关性和序列 ID,如果所有集合都已完成,它确实控制了按照我想要的顺序设置过程。日期是否不存在并不重要(如本例中的 20150702),只有现有文件按顺序处理并且集合必须完整。

最后,我无法让 Collection-Aggregator 执行此操作。为了克服这个问题,我构建了一个 Java class,其中包含用于 SUM 和 DTL 文件的映射,以关联 ID 作为键,以及一个排序的打开键列表。

然后 Java class 监视最小键上的完整集,并在该集可用于处理时向 Mule 流发信号。

在处理文件时必须将 Mule 流置于同步模式,以防止出现数据竞争情况。完成后,它会向 Java class 发送信号,表明处理已完成,并且可以从 list/Maps 中删除数据集,如果下一组已准备好处理,则会收到返回的指示.

它不是最漂亮的,我宁愿不为此使用自定义功能,但它完成了工作。