Spring 集成轮询器以错误的时间间隔触发
Spring integration poller triggering with the wrong time interval
代码:https://github.com/giuliopulina/spring-integration-poller
我在尝试使用 Spring 集成创建 jdbc 轮询器时遇到问题。
当我向 table 提供新数据时,处理速度比预期慢:一切正常,除了轮询每 60 秒触发一次,我不明白为什么。
2015-05-27 10:50:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - 解析表达式#root.![pk] 到(pks 列表)
2015-05-27 10:51:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - 解析表达式#root.![pk] 到(pks 列表)
这是 spring 集成配置的相关部分 xml:
<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>
<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->
<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
<constructor-arg ref="dataSource"/>
<constructor-arg value="XXXXXXXXXXXXXX"/>
<property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
<property name="maxRowsPerPoll" value="50"/>
</bean>
<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
<int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
<int:advice-chain>
<ref bean="threadPrepareInterceptor"/>
<ref bean="txAdvice"/>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
<int:channel id="jdbcOutputChannel" >
<!-- using direct channel -->
<!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>
你能帮我理解这个问题吗?
更新:
关于 "jdbcOutputChannel" 关于交易的建议,我同意并根据你的提示修改了我的配置,因为它更干净(无论如何,服务激活器也在单独的交易中 运行,即使不是' xml 样本中概述了)。
关于我遇到的问题,我尝试删除所有其他 spring 集成组件,并且轮询器如我预期的那样连续触发(我知道 fixed-rate=0 太高了:))
相反,当项目中的其他轮询器像这样配置时,我的轮询器似乎也继承了相同的超时:
<int:service-activator id="someOtherServiceActivator">
<int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>
将其他轮询器的超时切换为 10000 毫秒,我的轮询器也每 10 秒触发一次(而不是 60 秒)。
我无法分享完整的 spring 集成配置,但我想问一下:完全分离的 poller 是否可以相互修改行为?
更新 2:
我创建了一个单独的项目试图重现该问题,但我仍然无法做到。
因此,我尝试 删除 以下配置,引入它是为了仅在应用程序完全启动且 运行:
时启动轮询器
<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />
<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
然后问题就消失了,即使这样我也能完全理解原因。
无论如何,为我的轮询器创建一个不同的 startupChannel 效果很好:
<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />
<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
更新 3:
在为您准备带有代码的项目时,我注意到以下日志:
信息:没有明确定义名为 'taskScheduler' 的 bean。因此,会创建一个默认的ThreadPoolTaskScheduler。
所以,我添加了以下配置,现在一切正常:
<task:scheduler id="taskScheduler" pool-size="20" />
我猜默认池大小是 10,所以在 totalNumberOfPollers > taskScheduler.size() 时,以某种方式覆盖配置。
我说得对吗?
谢谢
朱利奥
我无法重现你的情况;我建议您在轮询之间进行线程转储以查看线程在做什么。
就是说,fixed-rate
的 0 非常激进;您的 DBA 可能会像那样毫不拖延地进行合适的轮询。
此外,jdbcOutputChannel
是一个 ExecutorChannel
,意味着事务将在消息发送到该通道后立即提交。如果您希望在交易中流向 运行,则不应在此处使用调度程序。
编辑:
我仍然无法用这个重现你的情况...
<int:control-bus input-channel="input"/>
<int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:publish-subscribe-channel id="ps" />
<int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />
<int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />
<int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
<int:poller fixed-rate="1000" />
</int:inbound-channel-adapter>
<int:channel id="bar">
<int:queue />
</int:channel>
<int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
expression="payload.toUpperCase()">
<int:poller fixed-rate="6000" receive-timeout="0" />
</int:service-activator>
<int:logging-channel-adapter id="baz" level="ERROR"/>
...正如预期的那样,我每 6 秒看到 6 FOO
s(i-c-a
每秒轮询一次,而 sa 运行s 每 6 秒轮询一次)。
EDIT2:
我查看了您的项目,正如您所说,问题的根本原因是许多轮询端点,但实际上是这样的:
fixed-rate="0" receive-timeout="60000"
使用此配置,调度程序资源(线程)在 QueueChannel
中被阻塞,正如您所发现的,您已经耗尽了所有资源。
一种解决方案是增加调度程序池中的线程数。
使用此配置,您似乎正在尝试通过让轮询器在队列 receive() 方法中不断等待来获得按需、零延迟的消息传递。
如果您无法承受任何延迟,请考虑改用 DirectChannel
。如果您不希望下游端点在调用者的线程上 运行,请使用 ExecutorChannel
s...
<task:executor id="exec" pool-size="100"/>
<int:channel id="otherMessageChannel1">
<int:dispatcher task-executor="exec" />
</int:channel>
这通常优于您当前的设置。
代码:https://github.com/giuliopulina/spring-integration-poller
我在尝试使用 Spring 集成创建 jdbc 轮询器时遇到问题。
当我向 table 提供新数据时,处理速度比预期慢:一切正常,除了轮询每 60 秒触发一次,我不明白为什么。
2015-05-27 10:50:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - 解析表达式#root.![pk] 到(pks 列表)
2015-05-27 10:51:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - 解析表达式#root.![pk] 到(pks 列表)
这是 spring 集成配置的相关部分 xml:
<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>
<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->
<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
<constructor-arg ref="dataSource"/>
<constructor-arg value="XXXXXXXXXXXXXX"/>
<property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
<property name="maxRowsPerPoll" value="50"/>
</bean>
<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
<int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
<int:advice-chain>
<ref bean="threadPrepareInterceptor"/>
<ref bean="txAdvice"/>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
<int:channel id="jdbcOutputChannel" >
<!-- using direct channel -->
<!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>
你能帮我理解这个问题吗?
更新:
关于 "jdbcOutputChannel" 关于交易的建议,我同意并根据你的提示修改了我的配置,因为它更干净(无论如何,服务激活器也在单独的交易中 运行,即使不是' xml 样本中概述了)。
关于我遇到的问题,我尝试删除所有其他 spring 集成组件,并且轮询器如我预期的那样连续触发(我知道 fixed-rate=0 太高了:)) 相反,当项目中的其他轮询器像这样配置时,我的轮询器似乎也继承了相同的超时:
<int:service-activator id="someOtherServiceActivator">
<int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>
将其他轮询器的超时切换为 10000 毫秒,我的轮询器也每 10 秒触发一次(而不是 60 秒)。 我无法分享完整的 spring 集成配置,但我想问一下:完全分离的 poller 是否可以相互修改行为?
更新 2: 我创建了一个单独的项目试图重现该问题,但我仍然无法做到。 因此,我尝试 删除 以下配置,引入它是为了仅在应用程序完全启动且 运行:
时启动轮询器<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />
<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
然后问题就消失了,即使这样我也能完全理解原因。 无论如何,为我的轮询器创建一个不同的 startupChannel 效果很好:
<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />
<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
更新 3:
在为您准备带有代码的项目时,我注意到以下日志:
信息:没有明确定义名为 'taskScheduler' 的 bean。因此,会创建一个默认的ThreadPoolTaskScheduler。
所以,我添加了以下配置,现在一切正常:
<task:scheduler id="taskScheduler" pool-size="20" />
我猜默认池大小是 10,所以在 totalNumberOfPollers > taskScheduler.size() 时,以某种方式覆盖配置。 我说得对吗?
谢谢 朱利奥
我无法重现你的情况;我建议您在轮询之间进行线程转储以查看线程在做什么。
就是说,fixed-rate
的 0 非常激进;您的 DBA 可能会像那样毫不拖延地进行合适的轮询。
此外,jdbcOutputChannel
是一个 ExecutorChannel
,意味着事务将在消息发送到该通道后立即提交。如果您希望在交易中流向 运行,则不应在此处使用调度程序。
编辑:
我仍然无法用这个重现你的情况...
<int:control-bus input-channel="input"/>
<int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:publish-subscribe-channel id="ps" />
<int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />
<int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />
<int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
<int:poller fixed-rate="1000" />
</int:inbound-channel-adapter>
<int:channel id="bar">
<int:queue />
</int:channel>
<int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
expression="payload.toUpperCase()">
<int:poller fixed-rate="6000" receive-timeout="0" />
</int:service-activator>
<int:logging-channel-adapter id="baz" level="ERROR"/>
...正如预期的那样,我每 6 秒看到 6 FOO
s(i-c-a
每秒轮询一次,而 sa 运行s 每 6 秒轮询一次)。
EDIT2:
我查看了您的项目,正如您所说,问题的根本原因是许多轮询端点,但实际上是这样的:
fixed-rate="0" receive-timeout="60000"
使用此配置,调度程序资源(线程)在 QueueChannel
中被阻塞,正如您所发现的,您已经耗尽了所有资源。
一种解决方案是增加调度程序池中的线程数。
使用此配置,您似乎正在尝试通过让轮询器在队列 receive() 方法中不断等待来获得按需、零延迟的消息传递。
如果您无法承受任何延迟,请考虑改用 DirectChannel
。如果您不希望下游端点在调用者的线程上 运行,请使用 ExecutorChannel
s...
<task:executor id="exec" pool-size="100"/>
<int:channel id="otherMessageChannel1">
<int:dispatcher task-executor="exec" />
</int:channel>
这通常优于您当前的设置。