使用网桥轮询器和任务执行器设置时未处理队列

Queue not being processed when setup with bridge poller and task executor

我正在尝试设置一个队列,该队列将由一个轮询器线程轮询,并将其内容移交给通过调度程序和任务执行器调用的某个服务。下面的代码是我想出的

<int:channel id="dataInQueue">
    <int:priority-queue capacity="100" />
</int:channel>

<int:bridge input-channel="dataInQueue" output-channel="dataInProcesingQueue">
    <int:poller receive-timeout="5000" fixed-rate="500" task-executor="taskScheduler" />
</int:bridge>   

<int:router input-channel="dataInProcesingQueue" expression="payload.runType.id">
    <int:mapping value="1" channel="processingQ1"/>
</int:router>   

<int:channel id="processingQ1" >
    <int:dispatcher task-executor="taskExecutor"/>
</int:channel>

<int:chain input-channel="processingQ1" output-channel="outChannel">
    <int:service-activator ref="myService"  />
</int:chain>    

<bean id="taskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="maxPoolSize" value="20" />
        <property name="corePoolSize" value="20" />
        <property name="threadNamePrefix" value="My-TaskExecutor" />                
</bean>
<bean id="taskScheduler"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="20" />
        <property name="threadNamePrefix" value="My-TaskScheduler" />
</bean> 

不幸的是,这不起作用。如果我将一条消息放入队列,我看不到它正在处理。另一方面,如果我将 taskExecutor & taskScheduler 替换为 org.springframework.core.task.SimpleAsyncTaskExecutor 实现,那么一切都会开始工作。 看起来问题出在我的线程池配置上,但我看不到任何错误。

只需删除此 task-executor="taskScheduler"

轮询器已经在内部使用了内置的taskSchedulertask-executor 属性适用于您希望立即切换到另一个线程的情况。

目前还不清楚为什么这会导致您的应用程序无法运行,但调度程序将其交给自己是多余的。

只需将其删除,调度程序将在路由器之后交给您的执行程序。

或者,在轮询器上设置 task-executor="taskExecutor" 并在 processingQ1 上删除调度程序 - 您不需要两次切换。

编辑:

就是说,我刚刚尝试了您的方案,它对我来说没问题(我看到了双重切换)...

11:05:43.623 DEBUG [My-TaskScheduler4][org.springframework.integration.endpoint.SourcePollingChannelAdapter] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.PriorityChannel] postReceive on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler4][org.springframework.integration.channel.PriorityChannel] postSend (sent=true) on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.endpoint.PollingConsumer] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.handler.BridgeHandler] org.springframework.integration.handler.BridgeHandler#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.630 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.ExecutorChannel] postSend (sent=true) on channel 'toRabbit', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.631 DEBUG [My-TaskExecutor1][org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint] org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]

如果按照我的建议还是不行,我建议你开启DEBUG日志记录(包括线程名称)并跟踪切换。