Spring 聚合器聚合输入组

Spring Aggregator Aggregate groups of input

我想将数据聚合成组,我该怎么做?为此,我想使用聚合器(Spring 集成)。我的场景是这样的。

Spring Batch<-->Spring Integration File -->Reader-->Proccessor<-->[Gateway<->Aggregator]

{Male,Joe;Male,Dave;Female,Anne;Female,Jane}-->sequentialyto-->Gateway-->Aggregate-> Gender(Male,{Joe,Dave}...Gender{Female,{Anne,Jane})

发布策略是什么样的?我需要小代码片段

谢谢

我不确定您的情况下 Reader 是什么,但是 Spring 集成提供了一个 FileSplitter 工具。它的 markers 可用于确定聚合器已准备好发布。

我会说你需要有一个 releaseStrategy 作为 false:

release-strategy-expression="false"

这意味着您的群组不会在正常情况下发布。您无法确定准备发布的组的大小。

FileSplitter.FileMarker.Mark.END 在读取文件的最后一行后作为来自 FileSplitter 的最后一条消息与 MessageGroupStore.expireMessageGroups(0) 调用一起发出将使该聚合器运行良好。不过还需要配置:

 <xsd:attribute name="send-partial-result-on-expiry" type="xsd:string">
                <xsd:annotation>
                    <xsd:documentation>
                    Specifies whether messages that expired should be aggregated and sent to the 'output-channel' or 'replyChannel'.
                    Messages are expired when their containing MessageGroup expires. One of the ways of expiring MessageGroups is by
                    configuring a MessageGroupStoreReaper. However MessageGroups can alternatively be expired by simply calling
                    MessageGroupStore.expireMessageGroups(timeout). That could be accomplished via a ControlBus operation
                    or by simply invoking that method if you have a reference to the MessageGroupStore instance.
                    </xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>

到期后正常释放组。