从 sqs fifo 队列获取消息后按相同顺序执行消息
Executing messages in same sequence after getting the messages from sqs fifo queue
我正在定义一个类似于下面定义的消息侦听器。消息是查询语句。消息按顺序推送到队列中,以便在推送到表时不会违反任何外键规则。当我将 maxnumberofmessages 设置为 10 时,我发现查询似乎是乱序执行的。但是,当我将其设置为 1 时,我看不到任何问题。当我将 maxnumberofmessages 设置为大于 1 的值时,如何确保消息的读取顺序与它们在队列中的顺序相同?
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
return simpleMessageListenerContainer;
}
@SqsListener(value = "${sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
repository.execute(serviceData);
}
修改代码
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
msgListenerContainerFactory.setAmazonSqs(amazonSQSAsyncClient());
msgListenerContainerFactory.setAutoStartup(false);
msgListenerContainerFactory.setMaxNumberOfMessages(10);
msgListenerContainerFactory.setTaskExecutor(threadPoolTaskExecutor());
return msgListenerContainerFactory;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("queueExecutor");
executor.initialize();
return executor;
}
@SqsListener(value = "${sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
repository.execute(serviceData);
}
那是因为逻辑是这样的:
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
messageBatchLatch.countDown();
}
}
关注getTaskExecutor().execute()
。这就是您的消息被转移到自己的线程中执行的方式。
您可以考虑将其重新配置为 ThreadPoolTaskExecutor
,池大小为 1
。并且您的所有消息都将在同一个线程中处理,因此将提供订单。
我正在定义一个类似于下面定义的消息侦听器。消息是查询语句。消息按顺序推送到队列中,以便在推送到表时不会违反任何外键规则。当我将 maxnumberofmessages 设置为 10 时,我发现查询似乎是乱序执行的。但是,当我将其设置为 1 时,我看不到任何问题。当我将 maxnumberofmessages 设置为大于 1 的值时,如何确保消息的读取顺序与它们在队列中的顺序相同?
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
return simpleMessageListenerContainer;
}
@SqsListener(value = "${sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
repository.execute(serviceData);
}
修改代码
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
msgListenerContainerFactory.setAmazonSqs(amazonSQSAsyncClient());
msgListenerContainerFactory.setAutoStartup(false);
msgListenerContainerFactory.setMaxNumberOfMessages(10);
msgListenerContainerFactory.setTaskExecutor(threadPoolTaskExecutor());
return msgListenerContainerFactory;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("queueExecutor");
executor.initialize();
return executor;
}
@SqsListener(value = "${sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
repository.execute(serviceData);
}
那是因为逻辑是这样的:
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
messageBatchLatch.countDown();
}
}
关注getTaskExecutor().execute()
。这就是您的消息被转移到自己的线程中执行的方式。
您可以考虑将其重新配置为 ThreadPoolTaskExecutor
,池大小为 1
。并且您的所有消息都将在同一个线程中处理,因此将提供订单。