Spring Integration Aggregator 在特定情况下将 expired/timed 消息发送到错误的频道
Spring Integration Aggregator is sending expired/timed out messages to the wrong channel in a specific scenario
我将消息发送到聚合器的输入通道,然后聚合器将聚合后的消息发布到输出通道。聚合器至少需要 2 条消息(用于聚合),否则等待 10 秒超时。我也在使用 jdbc 消息存储。
以下是我测试过的场景。
场景 1 工作正常
发送消息 1 和 2 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)
场景 2 工作正常
发送消息 1 和 2 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output2)
场景 3 工作正常
仅发送消息 1 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)
场景 4 失败,因为它没有将过期消息发送到输出 2,而是发送到输出 1
仅发送消息 1 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output1)
谁能提出方案 4 失败的原因?
以下是我的配置
<int:service-activator ref="activator" method="output1_activator" input-channel="output1" />
<int:service-activator ref="activator" method="output2_activator" input-channel="output2" />
<int:aggregator input-channel="input1"
output-channel="output1"
ref="waiter"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
message-store="myJdbcMessageStore" />
<int:aggregator input-channel="input2"
output-channel="output2"
ref="waiter"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
message-store="myJdbcMessageStore" />
<bean id="aggregatorJdbcDataSource" class="o.s.j.d.DriverManagerDataSource"> ..... </bean>
<bean id="myJdbcMessageStore" class="org.springframework.integration.jdbc.JdbcMessageStore">
<constructor-arg index="0" ref="aggregatorJdbcDataSource" />
</bean>
<bean id="telMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="myJdbcMessageStore" />
<property name="timeout" value="10000" />
</bean>
<task:scheduled-tasks>
<task:scheduled ref="telMessageStoreReaper" method="run" fixed-rate="5000" />
</task:scheduled-tasks>
您不能为两个聚合器使用相同的消息存储实例。收割者不知道哪个聚合器拥有该组。
您可以使用相同的表,但需要单独的消息存储实例;参见 partitioning a message store。
您需要两个商店,每个商店在不同的地区。
你也可以考虑使用 group-timeout
而不是收割者。
请将您的配置从评论移至主要问题,以便于其他人阅读。
我将消息发送到聚合器的输入通道,然后聚合器将聚合后的消息发布到输出通道。聚合器至少需要 2 条消息(用于聚合),否则等待 10 秒超时。我也在使用 jdbc 消息存储。
以下是我测试过的场景。
场景 1 工作正常
发送消息 1 和 2 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)
场景 2 工作正常
发送消息 1 和 2 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output2)
场景 3 工作正常
仅发送消息 1 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)
场景 4 失败,因为它没有将过期消息发送到输出 2,而是发送到输出 1
仅发送消息 1 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output1)
谁能提出方案 4 失败的原因?
以下是我的配置
<int:service-activator ref="activator" method="output1_activator" input-channel="output1" />
<int:service-activator ref="activator" method="output2_activator" input-channel="output2" />
<int:aggregator input-channel="input1"
output-channel="output1"
ref="waiter"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
message-store="myJdbcMessageStore" />
<int:aggregator input-channel="input2"
output-channel="output2"
ref="waiter"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
message-store="myJdbcMessageStore" />
<bean id="aggregatorJdbcDataSource" class="o.s.j.d.DriverManagerDataSource"> ..... </bean>
<bean id="myJdbcMessageStore" class="org.springframework.integration.jdbc.JdbcMessageStore">
<constructor-arg index="0" ref="aggregatorJdbcDataSource" />
</bean>
<bean id="telMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="myJdbcMessageStore" />
<property name="timeout" value="10000" />
</bean>
<task:scheduled-tasks>
<task:scheduled ref="telMessageStoreReaper" method="run" fixed-rate="5000" />
</task:scheduled-tasks>
您不能为两个聚合器使用相同的消息存储实例。收割者不知道哪个聚合器拥有该组。
您可以使用相同的表,但需要单独的消息存储实例;参见 partitioning a message store。
您需要两个商店,每个商店在不同的地区。
你也可以考虑使用 group-timeout
而不是收割者。
请将您的配置从评论移至主要问题,以便于其他人阅读。