使用周期从频道消费给定数量的消息
Consuming given amount of messages from channel with period
先问Artem Bilan(我和你以前的同事一起工作)
我有这样的流量:
- JdbcPollingChannelAdapter 用于尽快获取数据集(因为数据库是流量瓶颈)
- CHAIN_OF_TRANSFORMATORS(因业务需要)
- ServiceActivator 使用
JmsTemplate
发送 JMS
根据要求,我必须在每个静态周期发送不同数量的消息(它由一些 "profile" 提供,具有“10,20,100...”结构,表示 "send 10 messages during first minute, 20 msg during second minute, 100 msg during third minute...")。
PeriodicPoller
.
提供句点的实现非常简单
问题是需求的第一部分。使用 SqlParameterSource
实现的 JdbcPollingChannelAdapter
获得给定数量 table 行的情况,但不适合如上所述 "bottleneck" 原因。可以"get N messages from channel"吗?
我尝试在适配器旁边的通道上使用一些 ReleaseStrategy
来实现它,但是没有运气将它与定期轮询结合起来。如果有人帮助我,我会详细描述使用的方法。
与使用 Java DSL 描述集成流上下文相关的额外困难,同时大部分示例使用 XML。
感谢任何建议!
在与 Dmitrii 私下讨论后,我们提出了这个解决方案。
要求
- 读取特定数量的消息,一条一条发送到JMS队列
- 消息数量在每个轮询间隔发生变化
- 发送消息并将其数量与特定轮询任务所需的数量进行比较。如果它们不相等,则会生成错误报告,表明没有足够的消息进行投票。
解决方案
- 在
QueueChannel
中收集消息
- 在每次轮询时为端点更改
maxMessagesPerPoll
<poller>
(PollerMetadata
) 有 adviceChain
选项。
- 有了它我们可以提供一些自定义
Advice
(MethodInterceptor
)
- 在
invocation.proceed()
之前将我们的PollingConsumer
注入其中以更改maxMessagesPerPoll
- 使用一些
AtomicInteger
bean 在 ChannelInterceptor#preSend
和 reset
中递增 Advice
- 此外,您可以在该建议中检查之前的轮询状态,将
maxMessagesPerPoll
与计数器进行比较。
先问Artem Bilan(我和你以前的同事一起工作)
我有这样的流量:
- JdbcPollingChannelAdapter 用于尽快获取数据集(因为数据库是流量瓶颈)
- CHAIN_OF_TRANSFORMATORS(因业务需要)
- ServiceActivator 使用
JmsTemplate
发送 JMS
根据要求,我必须在每个静态周期发送不同数量的消息(它由一些 "profile" 提供,具有“10,20,100...”结构,表示 "send 10 messages during first minute, 20 msg during second minute, 100 msg during third minute...")。
PeriodicPoller
.
问题是需求的第一部分。使用 SqlParameterSource
实现的 JdbcPollingChannelAdapter
获得给定数量 table 行的情况,但不适合如上所述 "bottleneck" 原因。可以"get N messages from channel"吗?
我尝试在适配器旁边的通道上使用一些 ReleaseStrategy
来实现它,但是没有运气将它与定期轮询结合起来。如果有人帮助我,我会详细描述使用的方法。
与使用 Java DSL 描述集成流上下文相关的额外困难,同时大部分示例使用 XML。
感谢任何建议!
在与 Dmitrii 私下讨论后,我们提出了这个解决方案。
要求
- 读取特定数量的消息,一条一条发送到JMS队列
- 消息数量在每个轮询间隔发生变化
- 发送消息并将其数量与特定轮询任务所需的数量进行比较。如果它们不相等,则会生成错误报告,表明没有足够的消息进行投票。
解决方案
- 在
QueueChannel
中收集消息
- 在每次轮询时为端点更改
maxMessagesPerPoll
<poller>
(PollerMetadata
) 有adviceChain
选项。- 有了它我们可以提供一些自定义
Advice
(MethodInterceptor
) - 在
invocation.proceed()
之前将我们的 - 使用一些
AtomicInteger
bean 在ChannelInterceptor#preSend
和reset
中递增Advice
- 此外,您可以在该建议中检查之前的轮询状态,将
maxMessagesPerPoll
与计数器进行比较。
PollingConsumer
注入其中以更改maxMessagesPerPoll