消息被丢弃,从未到达任务执行器线程
Messages getting dropped, never reached task executor thread
我今天注意到一个非常奇怪的问题,收到了消息但从未处理过。在下面的配置中,我有 5 个配置为从 JMS 队列读取的侦听器,我的窃听记录器打印出收到了 JMS 消息,但是没有任何痕迹表明它被任务执行程序线程处理,其余处理运行正常。它只是在某个时间点发生了几条消息。我有下面提供的所有相关配置;任何纠正它的帮助将不胜感激。
<jms:message-driven-channel-adapter
id="IN1"
destination="replyQueue"
channel="InChannel"
auto-startup="false"
max-messages-per-task="20"
receive-timeout="10000"
max-concurrent-consumers="1"
concurrent-consumers="1"
idle-consumer-limit="1"
idle-task-execution-limit="1"
recovery-interval="60000"/>
<jms:message-driven-channel-adapter
id="IN2"
destination="replyQueue"
channel="InChannel"
auto-startup="false"
max-messages-per-task="20"
receive-timeout="10000"
max-concurrent-consumers="1"
concurrent-consumers="1"
idle-consumer-limit="1"
idle-task-execution-limit="1"
recovery-interval="60000"/>
<int:channel id="InChannel">
<int:dispatcher task-executor="saveTaskExecutor"/>
<int:interceptors>
<int:wire-tap channel="MyLogger"/>
</int:interceptors>
</int:channel>
窃听记录器打印收到的消息。
<int:logging-channel-adapter logger-name="MsgInLogger" id="MyLogger" level="INFO" expression="'received payload for processing'+ payload " />
任务执行器定义如下,我的低估是,它有无限队列,所以不存在与消息数量相关的问题,当消息量最小时也会发生这种情况。
<beans:bean id="saveTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<beans:property name="corePoolSize" value="20" />
<beans:property name="maxPoolSize" value="20" />
</beans:bean>
saveTaskExecutor 线程将花费很长时间来处理一条消息,然后才能处理下一条消息,不确定那里是否发生任何超时或过期之类的事情。
不应该有任何方法让它们丢失,但为什么你要这样做而不是简单地将 concurrent-consumers
设置为 20 并让容器来处理你的流量?
你拥有它的方式;如果您的服务器崩溃,您将丢失执行者队列中的所有消息。您还应该设置 acknowledge-mode="transacted"
或使用 SimpleMessageListenerContainer
以避免在服务器崩溃期间丢失进程中的消息。
我今天注意到一个非常奇怪的问题,收到了消息但从未处理过。在下面的配置中,我有 5 个配置为从 JMS 队列读取的侦听器,我的窃听记录器打印出收到了 JMS 消息,但是没有任何痕迹表明它被任务执行程序线程处理,其余处理运行正常。它只是在某个时间点发生了几条消息。我有下面提供的所有相关配置;任何纠正它的帮助将不胜感激。
<jms:message-driven-channel-adapter
id="IN1"
destination="replyQueue"
channel="InChannel"
auto-startup="false"
max-messages-per-task="20"
receive-timeout="10000"
max-concurrent-consumers="1"
concurrent-consumers="1"
idle-consumer-limit="1"
idle-task-execution-limit="1"
recovery-interval="60000"/>
<jms:message-driven-channel-adapter
id="IN2"
destination="replyQueue"
channel="InChannel"
auto-startup="false"
max-messages-per-task="20"
receive-timeout="10000"
max-concurrent-consumers="1"
concurrent-consumers="1"
idle-consumer-limit="1"
idle-task-execution-limit="1"
recovery-interval="60000"/>
<int:channel id="InChannel">
<int:dispatcher task-executor="saveTaskExecutor"/>
<int:interceptors>
<int:wire-tap channel="MyLogger"/>
</int:interceptors>
</int:channel>
窃听记录器打印收到的消息。
<int:logging-channel-adapter logger-name="MsgInLogger" id="MyLogger" level="INFO" expression="'received payload for processing'+ payload " />
任务执行器定义如下,我的低估是,它有无限队列,所以不存在与消息数量相关的问题,当消息量最小时也会发生这种情况。
<beans:bean id="saveTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<beans:property name="corePoolSize" value="20" />
<beans:property name="maxPoolSize" value="20" />
</beans:bean>
saveTaskExecutor 线程将花费很长时间来处理一条消息,然后才能处理下一条消息,不确定那里是否发生任何超时或过期之类的事情。
不应该有任何方法让它们丢失,但为什么你要这样做而不是简单地将 concurrent-consumers
设置为 20 并让容器来处理你的流量?
你拥有它的方式;如果您的服务器崩溃,您将丢失执行者队列中的所有消息。您还应该设置 acknowledge-mode="transacted"
或使用 SimpleMessageListenerContainer
以避免在服务器崩溃期间丢失进程中的消息。