使用周期从频道消费给定数量的消息

Consuming given amount of messages from channel with period

先问Artem Bilan(我和你以前的同事一起工作)

我有这样的流量:

根据要求,我必须在每个静态周期发送不同数量的消息(它由一些 "profile" 提供,具有“10,20,100...”结构,表示 "send 10 messages during first minute, 20 msg during second minute, 100 msg during third minute...")。 PeriodicPoller.

提供句点的实现非常简单

问题是需求的第一部分。使用 SqlParameterSource 实现的 JdbcPollingChannelAdapter 获得给定数量 table 行的情况,但不适合如上所述 "bottleneck" 原因。可以"get N messages from channel"吗?

我尝试在适配器旁边的通道上使用一些 ReleaseStrategy 来实现它,但是没有运气将它与定期轮询结合起来。如果有人帮助我,我会详细描述使用的方法。

与使用 Java DSL 描述集成流上下文相关的额外困难,同时大部分示例使用 XML。

感谢任何建议!

在与 Dmitrii 私下讨论后,我们提出了这个解决方案。

要求

  • 读取特定数量的消息,一条一条发送到JMS队列
  • 消息数量在每个轮询间隔发生变化
  • 发送消息并将其数量与特定轮询任务所需的数量进行比较。如果它们不相等,则会生成错误报告,表明没有足够的消息进行投票。

解决方案

  1. QueueChannel
  2. 中收集消息
  3. 在每次轮询时为端点更改 maxMessagesPerPoll
  4. <poller> (PollerMetadata) 有 adviceChain 选项。
  5. 有了它我们可以提供一些自定义 Advice (MethodInterceptor)
  6. invocation.proceed()
  7. 之前将我们的PollingConsumer注入其中以更改maxMessagesPerPoll
  8. 使用一些 AtomicInteger bean 在 ChannelInterceptor#preSendreset 中递增 Advice
  9. 此外,您可以在该建议中检查之前的轮询状态,将 maxMessagesPerPoll 与计数器进行比较。