消息被丢弃,从未到达任务执行器线程

Messages getting dropped, never reached task executor thread

我今天注意到一个非常奇怪的问题,收到了消息但从未处理过。在下面的配置中,我有 5 个配置为从 JMS 队列读取的侦听器,我的窃听记录器打印出收到了 JMS 消息,但是没有任何痕迹表明它被任务执行程序线程处理,其余处理运行正常。它只是在某个时间点发生了几条消息。我有下面提供的所有相关配置;任何纠正它的帮助将不胜感激。

 <jms:message-driven-channel-adapter                             
                            id="IN1"                                
                            destination="replyQueue"  
                            channel="InChannel" 
                            auto-startup="false" 
                            max-messages-per-task="20"
                            receive-timeout="10000"
                            max-concurrent-consumers="1" 
                            concurrent-consumers="1"                                                            
                            idle-consumer-limit="1"
                            idle-task-execution-limit="1"                               
                            recovery-interval="60000"/>
<jms:message-driven-channel-adapter                                  
                            id="IN2"                                
                            destination="replyQueue"  
                            channel="InChannel" 
                            auto-startup="false" 
                            max-messages-per-task="20"
                            receive-timeout="10000"
                            max-concurrent-consumers="1" 
                            concurrent-consumers="1"                                                            
                            idle-consumer-limit="1"
                            idle-task-execution-limit="1"                               
                            recovery-interval="60000"/>




<int:channel id="InChannel">
<int:dispatcher task-executor="saveTaskExecutor"/>
    <int:interceptors>
        <int:wire-tap channel="MyLogger"/>
    </int:interceptors>
</int:channel>  

窃听记录器打印收到的消息。

<int:logging-channel-adapter logger-name="MsgInLogger" id="MyLogger" level="INFO" expression="'received payload for processing'+ payload " />   

任务执行器定义如下,我的低估是,它有无限队列,所以不存在与消息数量相关的问题,当消息量最小时也会发生这种情况。

<beans:bean id="saveTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<beans:property name="corePoolSize" value="20" />
<beans:property name="maxPoolSize" value="20" />
</beans:bean>  

saveTaskExecutor 线程将花费很长时间来处理一条消息,然后才能处理下一条消息,不确定那里是否发生任何超时或过期之类的事情。

不应该有任何方法让它们丢失,但为什么你要这样做而不是简单地将 concurrent-consumers 设置为 20 并让容器来处理你的流量?

你拥有它的方式;如果您的服务器崩溃,您将丢失执行者队列中的所有消息。您还应该设置 acknowledge-mode="transacted" 或使用 SimpleMessageListenerContainer 以避免在服务器崩溃期间丢失进程中的消息。