使用@SqsListener 从不同的 AWS 账户读取消息
Read messages from different AWS account using @SqsListener
我有一个由第三方供应商提供的 SQS 标准队列,该供应商已授予我们的 IAM 用户从那里读取消息的权限。因此队列的 AWS 账户 ID 与我的用户不同。
我正在尝试使用 spring 的 @SqsListener
注释来使用这些消息,但我在指定应该使用的 accountId 时遇到了问题。
我的客户端 bean 配置如下所示:
@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(awsProperties.url, awsProperties.region))
.build()
我看不到在凭据中指定帐户 ID 的方法,而且我也找不到任何可用于定义帐户 ID 的属性。
我尝试将上面显示的 awsProperties.url
设置为类似 https://sqs.us-east-1.amazonaws.com/<accountId>
的值,但这似乎不起作用。它仍在尝试在我自己的帐户 ID 中查找队列并抛出一个找不到队列的错误。
知道如何解决这个问题并强制 Spring AWS bean 从特定的 AwsAccount 使用吗?
您有一个用户可以访问另一个帐户中的队列。这意味着您可以 运行 在您的帐户中使用该用户进行编码,并且可以访问另一个帐户中的队列。
初始化 sqsclient 将始终使用它运行正在使用的帐户
您不必调整它。
@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))
.build()
您需要确保代码可以访问队列。
在代码中,您应该像这样设置队列 URL:
https://sqs.<region>.amazonaws.com/<account>/<queuename>
,我很快尝试从另一个帐户访问队列。如果正确设置了队列的权限,则有两种可能性。第一个是使用队列 URL 而不是名称(我检查过,它有效)。第二个是创建您自己的 DestinationResolver 并将其提供给 SimpleMessageListenerContainer。我用 Spring Boot 创建了一个小应用程序,它运行良好。我把下面的代码贴给你了。
在下一个功能版本中,我会想出更好的方法来支持这个用例。
package demo;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.util.Assert;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public MessageListener messageListener() {
return new MessageListener();
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerFactory(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setDestinationResolver(new DynamicAccountAwareQueueUrlDestinationResolver(amazonSqs, resourceIdResolver));
return factory;
}
public static class DynamicAccountAwareQueueUrlDestinationResolver implements DestinationResolver<String> {
public static final String ACCOUNT_QUEUE_SEPARATOR = ":";
private final AmazonSQS amazonSqs;
private final DynamicQueueUrlDestinationResolver dynamicQueueUrlDestinationResolverDelegate;
public DynamicAccountAwareQueueUrlDestinationResolver(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
Assert.notNull(amazonSqs, "amazonSqs must not be null");
this.amazonSqs = amazonSqs;
this.dynamicQueueUrlDestinationResolverDelegate = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver);
}
@Override
public String resolveDestination(String queue) throws DestinationResolutionException {
if (queue.contains(ACCOUNT_QUEUE_SEPARATOR)) {
String account = queue.substring(0, queue.indexOf(ACCOUNT_QUEUE_SEPARATOR));
String queueName = queue.substring(queue.indexOf(ACCOUNT_QUEUE_SEPARATOR) + 1);
GetQueueUrlResult queueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest()
.withQueueName(queueName)
.withQueueOwnerAWSAccountId(account));
return queueUrlResult.getQueueUrl();
} else {
return this.dynamicQueueUrlDestinationResolverDelegate.resolveDestination(queue);
}
}
}
public static class MessageListener {
private static Logger LOG = LoggerFactory.getLogger(MessageListener.class);
@MessageMapping("633332177961:queue-name")
public void listen(String message) {
LOG.info("Received message: {}", message);
}
}
}
我有一个由第三方供应商提供的 SQS 标准队列,该供应商已授予我们的 IAM 用户从那里读取消息的权限。因此队列的 AWS 账户 ID 与我的用户不同。
我正在尝试使用 spring 的 @SqsListener
注释来使用这些消息,但我在指定应该使用的 accountId 时遇到了问题。
我的客户端 bean 配置如下所示:
@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(awsProperties.url, awsProperties.region))
.build()
我看不到在凭据中指定帐户 ID 的方法,而且我也找不到任何可用于定义帐户 ID 的属性。
我尝试将上面显示的 awsProperties.url
设置为类似 https://sqs.us-east-1.amazonaws.com/<accountId>
的值,但这似乎不起作用。它仍在尝试在我自己的帐户 ID 中查找队列并抛出一个找不到队列的错误。
知道如何解决这个问题并强制 Spring AWS bean 从特定的 AwsAccount 使用吗?
您有一个用户可以访问另一个帐户中的队列。这意味着您可以 运行 在您的帐户中使用该用户进行编码,并且可以访问另一个帐户中的队列。
初始化 sqsclient 将始终使用它运行正在使用的帐户 您不必调整它。
@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))
.build()
您需要确保代码可以访问队列。
在代码中,您应该像这样设置队列 URL:
https://sqs.<region>.amazonaws.com/<account>/<queuename>
,我很快尝试从另一个帐户访问队列。如果正确设置了队列的权限,则有两种可能性。第一个是使用队列 URL 而不是名称(我检查过,它有效)。第二个是创建您自己的 DestinationResolver 并将其提供给 SimpleMessageListenerContainer。我用 Spring Boot 创建了一个小应用程序,它运行良好。我把下面的代码贴给你了。
在下一个功能版本中,我会想出更好的方法来支持这个用例。
package demo;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.util.Assert;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public MessageListener messageListener() {
return new MessageListener();
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerFactory(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setDestinationResolver(new DynamicAccountAwareQueueUrlDestinationResolver(amazonSqs, resourceIdResolver));
return factory;
}
public static class DynamicAccountAwareQueueUrlDestinationResolver implements DestinationResolver<String> {
public static final String ACCOUNT_QUEUE_SEPARATOR = ":";
private final AmazonSQS amazonSqs;
private final DynamicQueueUrlDestinationResolver dynamicQueueUrlDestinationResolverDelegate;
public DynamicAccountAwareQueueUrlDestinationResolver(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
Assert.notNull(amazonSqs, "amazonSqs must not be null");
this.amazonSqs = amazonSqs;
this.dynamicQueueUrlDestinationResolverDelegate = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver);
}
@Override
public String resolveDestination(String queue) throws DestinationResolutionException {
if (queue.contains(ACCOUNT_QUEUE_SEPARATOR)) {
String account = queue.substring(0, queue.indexOf(ACCOUNT_QUEUE_SEPARATOR));
String queueName = queue.substring(queue.indexOf(ACCOUNT_QUEUE_SEPARATOR) + 1);
GetQueueUrlResult queueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest()
.withQueueName(queueName)
.withQueueOwnerAWSAccountId(account));
return queueUrlResult.getQueueUrl();
} else {
return this.dynamicQueueUrlDestinationResolverDelegate.resolveDestination(queue);
}
}
}
public static class MessageListener {
private static Logger LOG = LoggerFactory.getLogger(MessageListener.class);
@MessageMapping("633332177961:queue-name")
public void listen(String message) {
LOG.info("Received message: {}", message);
}
}
}