Spring - 如何从 queue 中读取多条消息并批量保存它们
Spring - How to read multiple messages off the queue and persist them in bulk
我正在尝试通过批量读取 queue 中的消息(假设一次读取 50 条消息)来提高使用(Spring 集成 + Solace 消息系统)构建的系统的性能) 并将它们保存到数据库中(批量)。
当我在 inbound-channel-adapter 中使用 Spring 集成轮询器时,配置如下:
<int:poller
messages-per-poll="100"
rate="1000"
/>
我有 2 个问题:
在 ServiceActivator 中,我仍然需要一次接收一条消息
时间(ServiceActivator 方法不允许方法声明
列表>参数
即使一次阅读 1 条消息,我也无法将它们收集到列表中
(按 50)批量保存,因为可能只有 40 条消息
在 queue 上,我需要(无限期地)等待另外 10 个
坚持。
我尝试用 com.solacesystems.jcsmp.FlowReceiver 来解决这个问题,它有一个缺点,就是我无法收到消息 headers。
你有什么建议?
Aggregator
EIP 适合您。它可以配置为将消息分组到 50
让我们说 Thread.currentThread.getId()
作为相关键。而且它还可以通过 AbstractMessageSourceAdvice.afterReceive()
:
的事实 "empty" 消息来释放组
/**
* Subclasses can take actions based on the result of the poll; e.g.
* adjust the {@code trigger}. The message can also be replaced with a new one.
* @param result the received message.
* @param source the message source.
* @return a message to continue to process the result, null to discard whatever the poll returned.
*/
public abstract Message<?> afterReceive(Message<?> result, MessageSource<?> source);
因此,当结果为 null
时,您应该触发 MessageGroupStoreReaper
使聚合器中的非 50 个组过期。
参见 Reference Manual。
我正在尝试通过批量读取 queue 中的消息(假设一次读取 50 条消息)来提高使用(Spring 集成 + Solace 消息系统)构建的系统的性能) 并将它们保存到数据库中(批量)。 当我在 inbound-channel-adapter 中使用 Spring 集成轮询器时,配置如下:
<int:poller messages-per-poll="100" rate="1000" />
我有 2 个问题:
在 ServiceActivator 中,我仍然需要一次接收一条消息 时间(ServiceActivator 方法不允许方法声明 列表>参数
即使一次阅读 1 条消息,我也无法将它们收集到列表中 (按 50)批量保存,因为可能只有 40 条消息 在 queue 上,我需要(无限期地)等待另外 10 个 坚持。
我尝试用 com.solacesystems.jcsmp.FlowReceiver 来解决这个问题,它有一个缺点,就是我无法收到消息 headers。
你有什么建议?
Aggregator
EIP 适合您。它可以配置为将消息分组到 50
让我们说 Thread.currentThread.getId()
作为相关键。而且它还可以通过 AbstractMessageSourceAdvice.afterReceive()
:
/**
* Subclasses can take actions based on the result of the poll; e.g.
* adjust the {@code trigger}. The message can also be replaced with a new one.
* @param result the received message.
* @param source the message source.
* @return a message to continue to process the result, null to discard whatever the poll returned.
*/
public abstract Message<?> afterReceive(Message<?> result, MessageSource<?> source);
因此,当结果为 null
时,您应该触发 MessageGroupStoreReaper
使聚合器中的非 50 个组过期。
参见 Reference Manual。