如何使用 spring 集成来调整 sqs 队列的消耗
How to pace the consumption of a sqs queue using spring integration
我正在尝试设置一个集成流程来使用来自 amazon sqs 队列的消息,目前它运行良好。但我想调整每分钟或秒的消息数量。例如每分钟 20 条消息。
这是我的 sql 侦听器 bean
的定义
@Bean
public MessageProducer mySqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY);
adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT);
adapter.setMaxNumberOfMessages(prefetch);
adapter.setOutputChannel(processMessageChannel());
return adapter;
}
如您所见,我正在设置每次轮询要获取的最大消息数,但是如何设置轮询之间的延迟?
在常规 jms 队列中,我可以使用 JMS.inboundAdapter 使用自定义轮询器,但似乎使用 SqsMessageDrivenChannelAdapter 我无法设置任何轮询计时器值。
也许我可以使用 SqsMessageDrivenChannelAdapter 以外的 MessageProducer,但是是哪个?
是否可以使用 sqs 设置 JMS.inboundAdapter?
Spring 集成 SqsMessageDrivenChannelAdapter
是消息驱动程序活动组件。它基于 Springh Cloud AWS 项目中的 SimpleMessageListenerContainer
,该项目具有长 运行 while()
循环来调用 AmazonSQS.receiveMessage()
。该循环中的逻辑并不太复杂:
try {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
messageBatchLatch.countDown();
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
如您所见,我们在那里创建 messageBatchLatch
并在循环后等待它。
每条消息都由它们自己的 SignalExecutingRunnable
处理,其中 countDown()
在 MessageExecutor
的末尾。所以,你想做的可能是通过目标服务方法中的人工 Thread.sleep()
来实现,以便在 SQS 轮询之间有更多的间隔。
但我听到了你的要求,我们确实必须添加类似的内容:
/**
* The sleep interval in milliseconds used in the main loop between shards polling cycles.
* Defaults to {@code 1000} minimum {@code 250}.
* @param idleBetweenPolls the interval to sleep between shards polling cycles.
*/
public void setIdleBetweenPolls(int idleBetweenPolls) {
this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
}
我为 KinesisMessageDrivenChannelAdapter
做了这件事,但在这里我们必须请求 Spring Cloud AWS 为 SimpleMessageListenerContainer
.
做这件事
我正在尝试设置一个集成流程来使用来自 amazon sqs 队列的消息,目前它运行良好。但我想调整每分钟或秒的消息数量。例如每分钟 20 条消息。
这是我的 sql 侦听器 bean
的定义 @Bean
public MessageProducer mySqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY);
adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT);
adapter.setMaxNumberOfMessages(prefetch);
adapter.setOutputChannel(processMessageChannel());
return adapter;
}
如您所见,我正在设置每次轮询要获取的最大消息数,但是如何设置轮询之间的延迟?
在常规 jms 队列中,我可以使用 JMS.inboundAdapter 使用自定义轮询器,但似乎使用 SqsMessageDrivenChannelAdapter 我无法设置任何轮询计时器值。
也许我可以使用 SqsMessageDrivenChannelAdapter 以外的 MessageProducer,但是是哪个?
是否可以使用 sqs 设置 JMS.inboundAdapter?
Spring 集成 SqsMessageDrivenChannelAdapter
是消息驱动程序活动组件。它基于 Springh Cloud AWS 项目中的 SimpleMessageListenerContainer
,该项目具有长 运行 while()
循环来调用 AmazonSQS.receiveMessage()
。该循环中的逻辑并不太复杂:
try {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
messageBatchLatch.countDown();
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
如您所见,我们在那里创建 messageBatchLatch
并在循环后等待它。
每条消息都由它们自己的 SignalExecutingRunnable
处理,其中 countDown()
在 MessageExecutor
的末尾。所以,你想做的可能是通过目标服务方法中的人工 Thread.sleep()
来实现,以便在 SQS 轮询之间有更多的间隔。
但我听到了你的要求,我们确实必须添加类似的内容:
/**
* The sleep interval in milliseconds used in the main loop between shards polling cycles.
* Defaults to {@code 1000} minimum {@code 250}.
* @param idleBetweenPolls the interval to sleep between shards polling cycles.
*/
public void setIdleBetweenPolls(int idleBetweenPolls) {
this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
}
我为 KinesisMessageDrivenChannelAdapter
做了这件事,但在这里我们必须请求 Spring Cloud AWS 为 SimpleMessageListenerContainer
.