JmsListener SQS unbale 消费消息
JmsListener SQS unbale to consume messages
我有一个配置,其中一次有 10 条消息并行进入 SQS 队列。
为了使用它,我正在使用 JmsListener。
让我向您展示我的配置:
public SQSConnectionFactory sqsConnectionFactory() {
// Create a new connection factory with all defaults (credentials and region) set automatically
return new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
}
@Bean("jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory());
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
要使用这个:
@JmsListener(destination = "queue.fifo",
containerFactory = "jmsListenerContainerFactory")
public void receiveCustomerStakeholderKyc(@Payload final Message<?> message) throws Exception {
}
我在用这个的时候。有些消息甚至没有出现在代码中。 JMS 不使用消息,这些消息被传输到 dead_queue.
Queues:
1. queue.fifo
Name: queue.fifo
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Created: 2019-09-16 12:50:43 GMT+05:30
Receive Message Wait Time: 0 seconds
Last Updated: 2020-06-12 16:35:29 GMT+05:30
Messages Available (Visible): 0
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 0
Queue Type: FIFO
Messages Delayed: 0
Content-Based Deduplication: Enabled
2. queue_dead.fifo
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Created: 2019-09-16 12:51:08 GMT+05:30
Receive Message Wait Time: 0 seconds
Last Updated: 2020-06-12 16:47:17 GMT+05:30
Messages Available (Visible): 5
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 0
Queue Type: FIFO Messages Delayed: 0
Content-Based Deduplication: Disabled
有什么我遗漏的吗
- 当我查看 was 控制台时,它说,此时收到了消息,但在我的日志中没有收到它们。
有没有办法启用 SQS 日志?
我必须说我犯了一个愚蠢的错误,但这可能发生在任何人身上。
Reason for the mistake
我有一个 Dev 和 AWS 的 QA 帐户。为了省钱,我们合并了两个帐户,但我们让 SQS 在单独的帐户中。
Way to access SQS queue
我正在尝试像这样创建 SQS 连接工厂:
public SQSConnectionFactory sqsConnectionFactory() {
// Create a new connection factory with all defaults (credentials and region) set automatically
return new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
}
通过这种方式,JMS 尝试根据 AWS 访问和密钥解析 SQS 队列 URL。现在我们已经合并了开发和 QA 帐户,甚至我的应用程序的 QA 实例也在创建一个开发 SQS URL.
Solution
我通过使用 AWS 账户 ID 而不是 AWS 访问和密钥动态解析 SQS 连接来解决它。
这里是解析代码:
Pass the ownerAccountId
public static class CustomDynamicDestinationResolver implements DestinationResolver {
private String ownerAccountId;
public CustomDynamicDestinationResolver(String ownerAccountId) {
this.ownerAccountId = ownerAccountId;
}
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
Assert.notNull(session, "Session must not be null");
Assert.notNull(destinationName, "Destination name must not be null");
if (pubSubDomain) {
return resolveTopic(session, destinationName);
} else {
return resolveQueue(session, destinationName);
}
}
protected Topic resolveTopic(Session session, String topicName) throws JMSException {
return session.createTopic(topicName);
}
protected Queue resolveQueue(Session session, String queueName) throws JMSException {
Queue queue;
//LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", ownerAccountId, queueName);
if (ownerAccountId != null && session instanceof SQSSession) {
queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
} else {
queue = session.createQueue(queueName);
}
return queue;
}
}
我有一个配置,其中一次有 10 条消息并行进入 SQS 队列。 为了使用它,我正在使用 JmsListener。
让我向您展示我的配置:
public SQSConnectionFactory sqsConnectionFactory() {
// Create a new connection factory with all defaults (credentials and region) set automatically
return new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
}
@Bean("jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory());
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
要使用这个:
@JmsListener(destination = "queue.fifo",
containerFactory = "jmsListenerContainerFactory")
public void receiveCustomerStakeholderKyc(@Payload final Message<?> message) throws Exception {
}
我在用这个的时候。有些消息甚至没有出现在代码中。 JMS 不使用消息,这些消息被传输到 dead_queue.
Queues:
1. queue.fifo
Name: queue.fifo
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Created: 2019-09-16 12:50:43 GMT+05:30
Receive Message Wait Time: 0 seconds
Last Updated: 2020-06-12 16:35:29 GMT+05:30
Messages Available (Visible): 0
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 0
Queue Type: FIFO
Messages Delayed: 0
Content-Based Deduplication: Enabled
2. queue_dead.fifo
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Created: 2019-09-16 12:51:08 GMT+05:30
Receive Message Wait Time: 0 seconds
Last Updated: 2020-06-12 16:47:17 GMT+05:30
Messages Available (Visible): 5
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 0
Queue Type: FIFO Messages Delayed: 0
Content-Based Deduplication: Disabled
有什么我遗漏的吗
- 当我查看 was 控制台时,它说,此时收到了消息,但在我的日志中没有收到它们。
有没有办法启用 SQS 日志?
我必须说我犯了一个愚蠢的错误,但这可能发生在任何人身上。
Reason for the mistake
我有一个 Dev 和 AWS 的 QA 帐户。为了省钱,我们合并了两个帐户,但我们让 SQS 在单独的帐户中。
Way to access SQS queue
我正在尝试像这样创建 SQS 连接工厂:
public SQSConnectionFactory sqsConnectionFactory() {
// Create a new connection factory with all defaults (credentials and region) set automatically
return new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
}
通过这种方式,JMS 尝试根据 AWS 访问和密钥解析 SQS 队列 URL。现在我们已经合并了开发和 QA 帐户,甚至我的应用程序的 QA 实例也在创建一个开发 SQS URL.
Solution
我通过使用 AWS 账户 ID 而不是 AWS 访问和密钥动态解析 SQS 连接来解决它。
这里是解析代码:
Pass the ownerAccountId
public static class CustomDynamicDestinationResolver implements DestinationResolver {
private String ownerAccountId;
public CustomDynamicDestinationResolver(String ownerAccountId) {
this.ownerAccountId = ownerAccountId;
}
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
Assert.notNull(session, "Session must not be null");
Assert.notNull(destinationName, "Destination name must not be null");
if (pubSubDomain) {
return resolveTopic(session, destinationName);
} else {
return resolveQueue(session, destinationName);
}
}
protected Topic resolveTopic(Session session, String topicName) throws JMSException {
return session.createTopic(topicName);
}
protected Queue resolveQueue(Session session, String queueName) throws JMSException {
Queue queue;
//LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", ownerAccountId, queueName);
if (ownerAccountId != null && session instanceof SQSSession) {
queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
} else {
queue = session.createQueue(queueName);
}
return queue;
}
}