使用网桥轮询器和任务执行器设置时未处理队列
Queue not being processed when setup with bridge poller and task executor
我正在尝试设置一个队列,该队列将由一个轮询器线程轮询,并将其内容移交给通过调度程序和任务执行器调用的某个服务。下面的代码是我想出的
<int:channel id="dataInQueue">
<int:priority-queue capacity="100" />
</int:channel>
<int:bridge input-channel="dataInQueue" output-channel="dataInProcesingQueue">
<int:poller receive-timeout="5000" fixed-rate="500" task-executor="taskScheduler" />
</int:bridge>
<int:router input-channel="dataInProcesingQueue" expression="payload.runType.id">
<int:mapping value="1" channel="processingQ1"/>
</int:router>
<int:channel id="processingQ1" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:chain input-channel="processingQ1" output-channel="outChannel">
<int:service-activator ref="myService" />
</int:chain>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="maxPoolSize" value="20" />
<property name="corePoolSize" value="20" />
<property name="threadNamePrefix" value="My-TaskExecutor" />
</bean>
<bean id="taskScheduler"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="20" />
<property name="threadNamePrefix" value="My-TaskScheduler" />
</bean>
不幸的是,这不起作用。如果我将一条消息放入队列,我看不到它正在处理。另一方面,如果我将 taskExecutor & taskScheduler 替换为 org.springframework.core.task.SimpleAsyncTaskExecutor 实现,那么一切都会开始工作。
看起来问题出在我的线程池配置上,但我看不到任何错误。
只需删除此 task-executor="taskScheduler"
。
轮询器已经在内部使用了内置的taskScheduler
; task-executor
属性适用于您希望立即切换到另一个线程的情况。
目前还不清楚为什么这会导致您的应用程序无法运行,但调度程序将其交给自己是多余的。
只需将其删除,调度程序将在路由器之后交给您的执行程序。
或者,在轮询器上设置 task-executor="taskExecutor"
并在 processingQ1
上删除调度程序 - 您不需要两次切换。
编辑:
就是说,我刚刚尝试了您的方案,它对我来说没问题(我看到了双重切换)...
11:05:43.623 DEBUG [My-TaskScheduler4][org.springframework.integration.endpoint.SourcePollingChannelAdapter] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.PriorityChannel] postReceive on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler4][org.springframework.integration.channel.PriorityChannel] postSend (sent=true) on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.endpoint.PollingConsumer] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.handler.BridgeHandler] org.springframework.integration.handler.BridgeHandler#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.630 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.ExecutorChannel] postSend (sent=true) on channel 'toRabbit', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.631 DEBUG [My-TaskExecutor1][org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint] org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
如果按照我的建议还是不行,我建议你开启DEBUG日志记录(包括线程名称)并跟踪切换。
我正在尝试设置一个队列,该队列将由一个轮询器线程轮询,并将其内容移交给通过调度程序和任务执行器调用的某个服务。下面的代码是我想出的
<int:channel id="dataInQueue">
<int:priority-queue capacity="100" />
</int:channel>
<int:bridge input-channel="dataInQueue" output-channel="dataInProcesingQueue">
<int:poller receive-timeout="5000" fixed-rate="500" task-executor="taskScheduler" />
</int:bridge>
<int:router input-channel="dataInProcesingQueue" expression="payload.runType.id">
<int:mapping value="1" channel="processingQ1"/>
</int:router>
<int:channel id="processingQ1" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:chain input-channel="processingQ1" output-channel="outChannel">
<int:service-activator ref="myService" />
</int:chain>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="maxPoolSize" value="20" />
<property name="corePoolSize" value="20" />
<property name="threadNamePrefix" value="My-TaskExecutor" />
</bean>
<bean id="taskScheduler"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="20" />
<property name="threadNamePrefix" value="My-TaskScheduler" />
</bean>
不幸的是,这不起作用。如果我将一条消息放入队列,我看不到它正在处理。另一方面,如果我将 taskExecutor & taskScheduler 替换为 org.springframework.core.task.SimpleAsyncTaskExecutor 实现,那么一切都会开始工作。 看起来问题出在我的线程池配置上,但我看不到任何错误。
只需删除此 task-executor="taskScheduler"
。
轮询器已经在内部使用了内置的taskScheduler
; task-executor
属性适用于您希望立即切换到另一个线程的情况。
目前还不清楚为什么这会导致您的应用程序无法运行,但调度程序将其交给自己是多余的。
只需将其删除,调度程序将在路由器之后交给您的执行程序。
或者,在轮询器上设置 task-executor="taskExecutor"
并在 processingQ1
上删除调度程序 - 您不需要两次切换。
编辑:
就是说,我刚刚尝试了您的方案,它对我来说没问题(我看到了双重切换)...
11:05:43.623 DEBUG [My-TaskScheduler4][org.springframework.integration.endpoint.SourcePollingChannelAdapter] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.PriorityChannel] postReceive on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler4][org.springframework.integration.channel.PriorityChannel] postSend (sent=true) on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.endpoint.PollingConsumer] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.handler.BridgeHandler] org.springframework.integration.handler.BridgeHandler#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.630 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.ExecutorChannel] postSend (sent=true) on channel 'toRabbit', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.631 DEBUG [My-TaskExecutor1][org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint] org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
如果按照我的建议还是不行,我建议你开启DEBUG日志记录(包括线程名称)并跟踪切换。