Spring 引导中具有不同凭证的多个 AWS SQS Queues

Multiple AWS SQS Queues with different Credentials in Spring Boot

我有一个 Spring 引导应用程序并希望从多个 AWS SQS queue 接收消息。 这些 queue 都有自己的凭据(遗憾的是我对此无能为力)。 None 这些凭据可以访问其他 queue 之一,它们都仅限于一个 queue。

只有一张queue和证件,很简单。我只需要提供作为 AWSCredentialsProvider Bean 的凭据,并用 @SqsListener \ @EnableSqs.
注释我的方法 但是我不知道如何使用多个凭据来做到这一点。

@SqsListener 注释无法提供凭证、预配置的 AmazonSqs object 或任何其他有帮助的东西。

我通过扩展 CredentialsProviderAmazonSqs 客户端搜索了一种将 queue 映射到凭据的方法,但无济于事。
我什至试图在 AmazonHttpClient 的 header 中注入凭据,但这也是不可能的。

我试图创建手动收听 SQS queue 所需的一切。 但我坚持为 SimpleMessageListenerContainer.
创建 MessageHandler 必需的 QueueMessageHandler 仅在创建为 bean 时才有效,并带有应用程序上下文。 否则它不会查找用 @SqsListener.
注释的方法 可悲的是,我能找到的唯一教程或示例使用 JMS,我想避免,或者只使用 @SqsListener 注释和一个 queue.

是否有任何其他方法可以为多个 queue 提供不同的凭据?

我的测试代码:

@Component
@Slf4j
public class TestOneQueueA {

  public static final String QUEUE_A = "TestOneQueueA";

  public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
    SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
    AWSStaticCredentialsProvider credentialsProvider =
        new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
            serviceInfo.getSecretAccessKey()));

    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(serviceInfo.getRegion()).build();

    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(client);
    queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));

    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setResourceIdResolver(resourceIdResolver);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
    simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
  }

  @SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
  }
}

编辑:

在尝试更多之后,我能够将我的 AmazonSQS 客户端注入到两个单独的 SimpleMessageListenerContainer 中。那么问题就变成了QueueMessageHandler

如果我在没有 bean 上下文的情况下手动创建它,它根本不会查找任何带有 @SqsListener 注释的方法。并且无法手动设置处理程序。
如果我将它创建为 bean,它将查看每个 bean 的注解。所以它也会找到它不应该寻找的 queue 的方法。然后它会崩溃,因为凭据不起作用。
我想不出一种方法来为单个 SqsListener 方法创建 QueueMessageHandler
SimpleMessageListenerContainer 除了 QueueMessageHandler.

什么都不接受

您可以为要使用自定义 @Qualifier 的帐户声明不同的 @Bean。假设您在两个不同的账户中有两个 SQS 队列。然后声明两个AmazonSQS.

类型的bean
@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(credentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(anotherCredentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

然后在你的服务中,你可以@Autowired他们。

@Autowired @Qualifier("second") private AmazonSQS sqsSecond;

在花了一些时间寻找更好的解决方案后,我坚持了以下几点:

package test;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class TestQueue {

  private static final String QUEUE_NAME = "TestQueue";
  private static final Logger log = LoggerFactory.getLogger(TestQueue.class);

  public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(regionProvider.getRegion())
        .build();

    // custom QueueMessageHandler to initialize only this queue
    CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
    queueMessageHandler.init(this);
    queueMessageHandler.afterPropertiesSet();

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
  }

  @SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
  }
}

和自定义 QueueMessageHandler:

package test;

import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

public class CustomQueueMessageHandler extends QueueMessageHandler {

  public void init(Object handler) {
    detectHandlerMethods(handler);
  }
}

CustomQueueMessageHandler 的唯一目的是将单个对象传递到应该扫描 SQS 注释的位置。 由于我不使用 Spring 上下文启动它,因此它不会在每个 bean 中搜索 @SqsListener 注释。 但是所有的初始化都隐藏在受保护的方法后面。 这就是为什么我需要覆盖 class 以访问那些初始化方法。

我不认为这是一个非常优雅的解决方案,手动创建所有 AWS 客户端内容,并调用 bean init 方法。 但这是我能找到的唯一仍然可以访问 AWS SQS 库所有功能的解决方案,例如转换传入消息和通知、删除策略、包括故障处理在内的队列轮询等。