SQSListener 不使用队列中的消息
SQSListener not consuming messages from queue
我看不到@SqsListener 使用的 SQS 队列中的消息
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; //others
@Component
public class Consumer{
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@SqsListener(value = "TEST-MY-QUEUE")
public void receiveMessage(String stringJson) {
System.out.println("***Consuming message: " + stringJson);
logger.info("Consuming message: " + stringJson);
}
}
我的配置(我在这里打印客户端队列,我可以 deffo 发现我想要使用的队列 - TEST-MY-QUEUE 。它在该区域正确打印 URL 。我也能够在 regionProvider
中查看正确加载的区域(与队列相同)
@Configuration
public class AwsConfiguration {
@Bean
@Primary
AmazonSQSAsync sqsClient() {
AmazonSQSAsync amazonSQSAsync = AmazonSQSAsyncClientBuilder.defaultClient();
System.out.println("Client queues = " + amazonSQSAsync.listQueues()); //The queue I want to consume is here
return amazonSQSAsync;
}
@Bean
AwsRegionProvider regionProvider() {
DefaultAwsRegionProviderChain defaultAwsRegionProviderChain = new DefaultAwsRegionProviderChain();
System.out.println("Region = " + defaultAwsRegionProviderChain.getRegion());
return defaultAwsRegionProviderChain;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync, QueueMessageHandler queueMessageHandler) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler(AmazonSQSAsync amazonSQSAsync) {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.initialize();
return executor;
}
和pom.xml(Java11,spring开机,spring云aws)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-core</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws-messaging</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
我注意到这里的问题非常相似,我将 pom.xml 中的依赖项更改为 spring-cloud-starter-aws-messaging 但没有为我修复。我仔细检查了名称(队列、注释),一切似乎都很好
当我 运行 我的应用程序启动正常但我没有看到任何日志或异常。没有一条消息被消费。
我错过了什么?
谢谢
您正在使用第三方 API。要从 Java 项目调用 Amazon Simple Queue Service (SQS),请使用官方 AWS SDK for Java V2。如果您不知道如何使用此 SDK,请参阅此开发指南:
Developer guide - AWS SDK for Java 2.x
有关 AWS SQS 的特定信息,请参阅:
Working with Amazon Simple Queue Service
这有指向 AWS Github 的链接,您可以在其中找到 POM 依赖项、代码等。
最后是配置问题(使用凭据)
在application.yml
credentials:
useDefaultAwsCredentialsChain: true #Will use credentials in /.aws
然后在创建 AmazonSQSAsync 的 AWSConfig class 中,让它使用该配置
public AmazonSQSAsync amazonSQSAsync() {
DefaultAWSCredentialsProviderChain defaultAWSCredentialsProviderChain = new DefaultAWSCredentialsProviderChain();
return AmazonSQSAsyncClientBuilder.standard().withRegion(region)
.withCredentials(defaultAWSCredentialsProviderChain)
.build();
我看不到@SqsListener 使用的 SQS 队列中的消息
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; //others
@Component
public class Consumer{
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@SqsListener(value = "TEST-MY-QUEUE")
public void receiveMessage(String stringJson) {
System.out.println("***Consuming message: " + stringJson);
logger.info("Consuming message: " + stringJson);
}
}
我的配置(我在这里打印客户端队列,我可以 deffo 发现我想要使用的队列 - TEST-MY-QUEUE 。它在该区域正确打印 URL 。我也能够在 regionProvider
中查看正确加载的区域(与队列相同)@Configuration
public class AwsConfiguration {
@Bean
@Primary
AmazonSQSAsync sqsClient() {
AmazonSQSAsync amazonSQSAsync = AmazonSQSAsyncClientBuilder.defaultClient();
System.out.println("Client queues = " + amazonSQSAsync.listQueues()); //The queue I want to consume is here
return amazonSQSAsync;
}
@Bean
AwsRegionProvider regionProvider() {
DefaultAwsRegionProviderChain defaultAwsRegionProviderChain = new DefaultAwsRegionProviderChain();
System.out.println("Region = " + defaultAwsRegionProviderChain.getRegion());
return defaultAwsRegionProviderChain;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync, QueueMessageHandler queueMessageHandler) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler(AmazonSQSAsync amazonSQSAsync) {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.initialize();
return executor;
}
和pom.xml(Java11,spring开机,spring云aws)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-core</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws-messaging</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
我注意到这里的问题非常相似,我将 pom.xml 中的依赖项更改为 spring-cloud-starter-aws-messaging 但没有为我修复。我仔细检查了名称(队列、注释),一切似乎都很好 当我 运行 我的应用程序启动正常但我没有看到任何日志或异常。没有一条消息被消费。 我错过了什么?
谢谢
您正在使用第三方 API。要从 Java 项目调用 Amazon Simple Queue Service (SQS),请使用官方 AWS SDK for Java V2。如果您不知道如何使用此 SDK,请参阅此开发指南:
Developer guide - AWS SDK for Java 2.x
有关 AWS SQS 的特定信息,请参阅:
Working with Amazon Simple Queue Service
这有指向 AWS Github 的链接,您可以在其中找到 POM 依赖项、代码等。
最后是配置问题(使用凭据)
在application.yml
credentials:
useDefaultAwsCredentialsChain: true #Will use credentials in /.aws
然后在创建 AmazonSQSAsync 的 AWSConfig class 中,让它使用该配置
public AmazonSQSAsync amazonSQSAsync() {
DefaultAWSCredentialsProviderChain defaultAWSCredentialsProviderChain = new DefaultAWSCredentialsProviderChain();
return AmazonSQSAsyncClientBuilder.standard().withRegion(region)
.withCredentials(defaultAWSCredentialsProviderChain)
.build();