Spring 集成消息轮询
Spring Integration message polling
我有一个 Spring 配置设置,用于从数据库队列轮询消息:
<int:annotation-config default-publisher-channel="messageChannel" />
<task:executor id="messageTaskExecutor" pool-size="1"
queue-capacity="1" rejection-policy="CALLER_RUNS" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<bean id="messageQueryProvider"
class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />
<bean id="messageSessionStore"
class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource" />
<property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
<property name="tablePrefix" value="QUEUE_" />
<property name="usingIdCache" value="true" />
</bean>
<int:channel id="messageChannel">
<int:queue message-store="messageSessionStore" />
</int:channel>
<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>
但是,该应用程序在多个节点上运行。当服务器重启时,消息似乎被超过1个节点拾取(节点全部关闭并依次重启)。有什么办法可以避免多消息处理吗?
这在某种程度上无法使用 OracleChannelMessageStoreQueryProvider
。只是因为我们在那里依赖了FOR UPDATE SKIP LOCKED
。因此,当一个节点执行 SELECT
时,记录被锁定,下一个将转到 table.
中的下一个空闲行
JavaDoc 中没有 setUsingIdCache()
:
* <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
* to true, as the Oracle query will ignore locked rows.</p>
但我认为这完全无关。删除该选项和 <int:transaction-synchronization-factory>
您将简化配置,但不得更改行为。
我想你看到的是这样的round-robin
:一个节点获得第一行,下一个节点跳过它并获得下一个。
我不相信不同的节点在使用 Oracle 时会收到相同的消息。
我有一个 Spring 配置设置,用于从数据库队列轮询消息:
<int:annotation-config default-publisher-channel="messageChannel" />
<task:executor id="messageTaskExecutor" pool-size="1"
queue-capacity="1" rejection-policy="CALLER_RUNS" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<bean id="messageQueryProvider"
class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />
<bean id="messageSessionStore"
class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource" />
<property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
<property name="tablePrefix" value="QUEUE_" />
<property name="usingIdCache" value="true" />
</bean>
<int:channel id="messageChannel">
<int:queue message-store="messageSessionStore" />
</int:channel>
<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>
但是,该应用程序在多个节点上运行。当服务器重启时,消息似乎被超过1个节点拾取(节点全部关闭并依次重启)。有什么办法可以避免多消息处理吗?
这在某种程度上无法使用 OracleChannelMessageStoreQueryProvider
。只是因为我们在那里依赖了FOR UPDATE SKIP LOCKED
。因此,当一个节点执行 SELECT
时,记录被锁定,下一个将转到 table.
JavaDoc 中没有 setUsingIdCache()
:
* <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
* to true, as the Oracle query will ignore locked rows.</p>
但我认为这完全无关。删除该选项和 <int:transaction-synchronization-factory>
您将简化配置,但不得更改行为。
我想你看到的是这样的round-robin
:一个节点获得第一行,下一个节点跳过它并获得下一个。
我不相信不同的节点在使用 Oracle 时会收到相同的消息。