使用@SqsListener 从不同的 AWS 账户读取消息

Read messages from different AWS account using @SqsListener

我有一个由第三方供应商提供的 SQS 标准队列,该供应商已授予我们的 IAM 用户从那里读取消息的权限。因此队列的 AWS 账户 ID 与我的用户不同。

我正在尝试使用 spring 的 @SqsListener 注释来使用这些消息,但我在指定应该使用的 accountId 时遇到了问题。

我的客户端 bean 配置如下所示:

@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))                
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(awsProperties.url, awsProperties.region))                
.build() 

我看不到在凭据中指定帐户 ID 的方法,而且我也找不到任何可用于定义帐户 ID 的属性。

我尝试将上面显示的 awsProperties.url 设置为类似 https://sqs.us-east-1.amazonaws.com/<accountId> 的值,但这似乎不起作用。它仍在尝试在我自己的帐户 ID 中查找队列并抛出一个找不到队列的错误。

知道如何解决这个问题并强制 Spring AWS bean 从特定的 AwsAccount 使用吗?

您有一个用户可以访问另一个帐户中的队列。这意味着您可以 运行 在您的帐户中使用该用户进行编码,并且可以访问另一个帐户中的队列。

初始化 sqsclient 将始终使用它运行正在使用的帐户 您不必调整它。

@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))                        
.build() 

您需要确保代码可以访问队列。

在代码中,您应该像这样设置队列 URL: https://sqs.<region>.amazonaws.com/<account>/<queuename>

,我很快尝试从另一个帐户访问队列。如果正确设置了队列的权限,则有两种可能性。第一个是使用队列 URL 而不是名称(我检查过,它有效)。第二个是创建您自己的 DestinationResolver 并将其提供给 SimpleMessageListenerContainer。我用 Spring Boot 创建了一个小应用程序,它运行良好。我把下面的代码贴给你了。

在下一个功能版本中,我会想出更好的方法来支持这个用例。

package demo;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.util.Assert;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerFactory(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setDestinationResolver(new DynamicAccountAwareQueueUrlDestinationResolver(amazonSqs, resourceIdResolver));

        return factory;
    }

    public static class DynamicAccountAwareQueueUrlDestinationResolver implements DestinationResolver<String> {

        public static final String ACCOUNT_QUEUE_SEPARATOR = ":";
        private final AmazonSQS amazonSqs;
        private final DynamicQueueUrlDestinationResolver dynamicQueueUrlDestinationResolverDelegate;

        public DynamicAccountAwareQueueUrlDestinationResolver(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
            Assert.notNull(amazonSqs, "amazonSqs must not be null");

            this.amazonSqs = amazonSqs;
            this.dynamicQueueUrlDestinationResolverDelegate = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver);
        }

        @Override
        public String resolveDestination(String queue) throws DestinationResolutionException {
            if (queue.contains(ACCOUNT_QUEUE_SEPARATOR)) {
                String account = queue.substring(0, queue.indexOf(ACCOUNT_QUEUE_SEPARATOR));
                String queueName = queue.substring(queue.indexOf(ACCOUNT_QUEUE_SEPARATOR) + 1);
                GetQueueUrlResult queueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest()
                        .withQueueName(queueName)
                        .withQueueOwnerAWSAccountId(account));
                return queueUrlResult.getQueueUrl();
            } else {
                return this.dynamicQueueUrlDestinationResolverDelegate.resolveDestination(queue);
            }
        }
    }

    public static class MessageListener {

        private static Logger LOG = LoggerFactory.getLogger(MessageListener.class);

        @MessageMapping("633332177961:queue-name")
        public void listen(String message) {
            LOG.info("Received message: {}", message);
        }

    }

}