并发 SQS 队列侦听器
Concurrent SQS queue listeners
我不明白 SQS QueueListener 是如何工作的。
这是我的配置:
/**
* AWS Credentials Bean
*/
@Bean
public AWSCredentials awsCredentials() {
return new BasicAWSCredentials(accessKey, secretAccessKey);
}
/**
* AWS Client Bean
*/
@Bean
public AmazonSQS amazonSQSAsyncClient() {
AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials());
sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1));
return sqsClient;
}
/**
* AWS Connection Factory
*/
@Bean
public SQSConnectionFactory connectionFactory() {
SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
Region.getRegion(Regions.US_EAST_1));
factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return awsCredentials();
}
@Override
public void refresh() {
}
});
return factoryBuilder.build();
}
/**
* Registering QueueListener for queueName
*/
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName(queueName);
messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener()));
messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler());
messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3));
return messageListenerContainer;
}
如您所见,我已将 DefaultMessageListenerContainer
配置为 Executors.newFixedThreadPool(3)
这样我希望在我的队列侦听器中同时执行 3 个并发任务。
这是我的听众逻辑:
public class QueueListener {
public void handleMessage(String messageContent) {
try {
logger.info(String.format("message received: %s", messageContent));
logger.info("wait 30 sec");
Thread.sleep(1000 * 30);
logger.info("done");
} catch (Throwable th) {
throw new QueueListenerException(messageContent, th);
}
}
}
现在每个 handleMessage
方法会阻止 (Thread.sleep(1000 * 30);
) 执行 30 秒,并且一次只有 1 个 handleMessage
方法执行。
我做错了什么?
如何实现一次并发handleMessage
方法调用?
使用当前配置,我希望同时执行 3 handleMessage
。
您可以通过添加 messageListenerContainer.setConcurrency("3-10");
在 DefaultMessageListenerConfigurator 的 bean 中添加参数来处理并发执行,这意味着它将从 3 个线程开始并扩展到 10 个。
concurrentConsumers 的数量也可以使用 messageListenerContainer.setConcurrentConsumers(3);
来设置
我不明白 SQS QueueListener 是如何工作的。
这是我的配置:
/**
* AWS Credentials Bean
*/
@Bean
public AWSCredentials awsCredentials() {
return new BasicAWSCredentials(accessKey, secretAccessKey);
}
/**
* AWS Client Bean
*/
@Bean
public AmazonSQS amazonSQSAsyncClient() {
AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials());
sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1));
return sqsClient;
}
/**
* AWS Connection Factory
*/
@Bean
public SQSConnectionFactory connectionFactory() {
SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
Region.getRegion(Regions.US_EAST_1));
factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return awsCredentials();
}
@Override
public void refresh() {
}
});
return factoryBuilder.build();
}
/**
* Registering QueueListener for queueName
*/
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName(queueName);
messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener()));
messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler());
messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3));
return messageListenerContainer;
}
如您所见,我已将 DefaultMessageListenerContainer
配置为 Executors.newFixedThreadPool(3)
这样我希望在我的队列侦听器中同时执行 3 个并发任务。
这是我的听众逻辑:
public class QueueListener {
public void handleMessage(String messageContent) {
try {
logger.info(String.format("message received: %s", messageContent));
logger.info("wait 30 sec");
Thread.sleep(1000 * 30);
logger.info("done");
} catch (Throwable th) {
throw new QueueListenerException(messageContent, th);
}
}
}
现在每个 handleMessage
方法会阻止 (Thread.sleep(1000 * 30);
) 执行 30 秒,并且一次只有 1 个 handleMessage
方法执行。
我做错了什么?
如何实现一次并发handleMessage
方法调用?
使用当前配置,我希望同时执行 3 handleMessage
。
您可以通过添加 messageListenerContainer.setConcurrency("3-10");
在 DefaultMessageListenerConfigurator 的 bean 中添加参数来处理并发执行,这意味着它将从 3 个线程开始并扩展到 10 个。
concurrentConsumers 的数量也可以使用 messageListenerContainer.setConcurrentConsumers(3);