Spring Integration Aggregator 在特定情况下将 expired/timed 消息发送到错误的频道

Spring Integration Aggregator is sending expired/timed out messages to the wrong channel in a specific scenario

我将消息发送到聚合器的输入通道,然后聚合器将聚合后的消息发布到输出通道。聚合器至少需要 2 条消息(用于聚合),否则等待 10 秒超时。我也在使用 jdbc 消息存储。

以下是我测试过的场景。

场景 1 工作正常

发送消息 1 和 2 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)

场景 2 工作正常

发送消息 1 和 2 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output2)

场景 3 工作正常

仅发送消息 1 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)

场景 4 失败,因为它没有将过期消息发送到输出 2,而是发送到输出 1

仅发送消息 1 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output1)

谁能提出方案 4 失败的原因?

以下是我的配置

<int:service-activator ref="activator" method="output1_activator" input-channel="output1" /> 

<int:service-activator ref="activator" method="output2_activator" input-channel="output2" /> 

<int:aggregator input-channel="input1" 
    output-channel="output1" 
    ref="waiter" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    message-store="myJdbcMessageStore" /> 

<int:aggregator input-channel="input2" 
    output-channel="output2" 
    ref="waiter" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    message-store="myJdbcMessageStore" />

<bean id="aggregatorJdbcDataSource" class="o.s.j.d.DriverManagerDataSource"> ..... </bean> 

<bean id="myJdbcMessageStore" class="org.springframework.integration.jdbc.JdbcMessageStore"> 
    <constructor-arg index="0" ref="aggregatorJdbcDataSource" /> 
</bean> 

<bean id="telMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">     <property name="messageGroupStore" ref="myJdbcMessageStore" /> 
    <property name="timeout" value="10000" /> 
</bean> 

<task:scheduled-tasks> 
    <task:scheduled ref="telMessageStoreReaper" method="run" fixed-rate="5000" /> 
</task:scheduled-tasks>

您不能为两个聚合器使用相同的消息存储实例。收割者不知道哪个聚合器拥有该组。

您可以使用相同的表,但需要单独的消息存储实例;参见 partitioning a message store

您需要两个商店,每个商店在不同的地区。

你也可以考虑使用 group-timeout 而不是收割者。

请将您的配置从评论移至主要问题,以便于其他人阅读。