JmsListener SQS unbale 消费消息

JmsListener SQS unbale to consume messages

我有一个配置,其中一次有 10 条消息并行进入 SQS 队列。 为了使用它,我正在使用 JmsListener。

让我向您展示我的配置:

  public SQSConnectionFactory sqsConnectionFactory() {
    // Create a new connection factory with all defaults (credentials and region) set automatically
    return new SQSConnectionFactory(new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
            .withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
  }

  @Bean("jmsListenerContainerFactory")
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(sqsConnectionFactory());
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setConcurrency("3-10");
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
  }

要使用这个:

  @JmsListener(destination = "queue.fifo",
      containerFactory = "jmsListenerContainerFactory")
  public void receiveCustomerStakeholderKyc(@Payload final Message<?> message) throws Exception {

}

我在用这个的时候。有些消息甚至没有出现在代码中。 JMS 不使用消息,这些消息被传输到 dead_queue.

Queues:
1. queue.fifo

Name:   queue.fifo  
Default Visibility Timeout: 30 seconds
Message Retention Period:   4 days
Maximum Message Size:   256 KB
Created:    2019-09-16 12:50:43 GMT+05:30   
Receive Message Wait Time:  0 seconds
Last Updated:   2020-06-12 16:35:29 GMT+05:30   
Messages Available (Visible):   0
Delivery Delay: 0 seconds   
Messages in Flight (Not Visible):   0
Queue Type: FIFO    
Messages Delayed:   0
Content-Based Deduplication:    Enabled     

2. queue_dead.fifo
Default Visibility Timeout: 30 seconds  
Message Retention Period:   4 days  
Maximum Message Size:   256 KB
Created:    2019-09-16 12:51:08 GMT+05:30   
Receive Message Wait Time:  0 seconds
Last Updated:   2020-06-12 16:47:17 GMT+05:30   
Messages Available (Visible):   5
Delivery Delay: 0 seconds   
Messages in Flight (Not Visible):   0
Queue Type: FIFO    Messages Delayed:   0
Content-Based Deduplication:    Disabled

有什么我遗漏的吗

  1. 当我查看 was 控制台时,它说,此时收到了消息,但在我的日志中没有收到它们。

有没有办法启用 SQS 日志?

我必须说我犯了一个愚蠢的错误,但这可能发生在任何人身上。

Reason for the mistake

我有一个 DevAWS 的 QA 帐户。为了省钱,我们合并了两个帐户,但我们让 SQS 在单独的帐户中。

Way to access SQS queue

我正在尝试像这样创建 SQS 连接工厂:

 public SQSConnectionFactory sqsConnectionFactory() {
    // Create a new connection factory with all defaults (credentials and region) set automatically
    return new SQSConnectionFactory(new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
            .withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
  }

通过这种方式,JMS 尝试根据 AWS 访问和密钥解析 SQS 队列 URL。现在我们已经合并了开发和 QA 帐户,甚至我的应用程序的 QA 实例也在创建一个开发 SQS URL.

Solution

我通过使用 AWS 账户 ID 而不是 AWS 访问和密钥动态解析 SQS 连接来解决它。

这里是解析代码:

Pass the ownerAccountId

  public static class CustomDynamicDestinationResolver implements DestinationResolver {

        private String ownerAccountId;

        public CustomDynamicDestinationResolver(String ownerAccountId) {
            this.ownerAccountId = ownerAccountId;
        }

        @Override
        public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
            Assert.notNull(session, "Session must not be null");
            Assert.notNull(destinationName, "Destination name must not be null");
            if (pubSubDomain) {
                return resolveTopic(session, destinationName);
            } else {
                return resolveQueue(session, destinationName);
            }
        }

        protected Topic resolveTopic(Session session, String topicName) throws JMSException {
            return session.createTopic(topicName);
        }

        protected Queue resolveQueue(Session session, String queueName) throws JMSException {
            Queue queue;
            //LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", ownerAccountId, queueName);
            if (ownerAccountId != null && session instanceof SQSSession) {
                queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
            } else {
                queue = session.createQueue(queueName);
            }
            return queue;
        }
  }