Spring Cloud AWS 设置手动确认 SQS 消息的问题

Spring Cloud AWS Issue with setting manual acknowledge of SQS message

我正在尝试通过使用 spring-cloud-aws-messaging 手动删除 AWS SQS 消息来实现逻辑。此功能已在 this ticket from the example in tests

范围内实施
@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {

    LOGGER.info("Received message {}", message.getFoo());
    try {
        acknowledgment.acknowledge().get();
    } catch (InterruptedException e) {
        LOGGER.error("Opps", e);
    } catch (ExecutionException e) {
        LOGGER.error("Opps", e);
    }
}

但是遇到了意外的异常

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance oforg.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

SqsMessageDeletionPolicy.ON_SUCCESS 的解决方案有效,但我想避免抛出异常。

我在配置中遗漏了什么?

它花了一些时间摆弄并尝试了与其他 SO 答案不同的东西。

这是我的代码,我会尽力解释。我包括了我用于 SQS 消费者的所有内容。

我的配置 class 如下。下面唯一需要注意的不太明显的事情是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。 MappingJackson2MessageConverter(以防从非常明显的 class 名称看不明显)class 处理来自 SQS 的有效负载的反序列化。

将严格的内容类型匹配设置为 false 也很重要。

此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式配置它:

objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

你可能不想那样做,所以你可以将它留空,它会创建自己的 ObjectMapper。

我认为代码的其余部分是不言自明的...?如果没有请告诉我。

我们的用例之间的一个区别是,您似乎在映射自己的自定义对象 (SqsEventDTO),我认为这可行吗?在那种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。

@Configuration
public class AppConfig {

@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    return queueMessageHandlerFactory.createQueueMessageHandler();
}

@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {

    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    factory.setAmazonSqs(sqsClient);

    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setSerializedPayloadClass(String.class);

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);

    // Uses the MappingJackson2MessageConverter object to resolve/map 
    // the payload against the Message/S3EventNotification argument.
    PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);

    // Extract the acknowledgment data from the payload's headers, 
    // which then gets deserialized into the Acknowledgment object.  
    AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");

    // I don't remember the specifics of WHY, however there is 
    // something important about the order of the argument resolvers 
    // in the list
    factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));

    return factory;
}

@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
    @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {

    SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
    smlc.setWaitTimeOut(20);
    smlc.setAmazonSqs(amazonSQSAsync);
    smlc.setMessageHandler(queueMessageHandler);
    smlc.setBeanName("ConsumerBean");
    smlc.setMaxNumberOfMessages(sqsMaxMessages);
    smlc.setTaskExecutor(threadPoolExecutor);

    return smlc;
}

@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(corePoolSize);
    executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setKeepAliveSeconds(threadTimeoutSeconds);
    executor.setThreadNamePrefix(threadName);
    executor.initialize();

    return executor;
}
}

下面是我的 SQS 消费者服务 class。

@Service
public class RawConsumer {

@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
    // Handle event here
}

希望对您有所帮助,如果您有任何问题,请告诉我。

问题作者没有提到的是他试图定制杰克逊ObjectMapper。因此,他实例化了一个 MappingJackson2MessageConverter,将其包装在 PayloadArgumentResolver 中并将其设置为 QueueMessageHandlerFactory.setArgumentResolvers() 上的单个 HandlerMethodArgumentResolver。这样做会覆盖 QueueMessageHandler.initArgumentResolvers() 中定义的默认参数解析器列表(在 QueueMessageHandlerFactory 中创建 QueueMessageHandler 的实例时调用)。

当例如只有 PayloadArgumentResolver 被设置为单参数解析器,Acknowledgement 参数不能再绑定了。

为了自定义 Jackson 消息转换器,比覆盖参数解析器列表更好的解决方案是在 QueueMessageHandlerFactory:

上设置消息转换器列表
    @Bean
    fun queueMessageHandlerFactory(objectMapper: ObjectMapper): QueueMessageHandlerFactory {
        val factory = QueueMessageHandlerFactory()

        val messageConverter = MappingJackson2MessageConverter()
        messageConverter.objectMapper = objectMapper

        factory.setMessageConverters(listOf(messageConverter)) // <-- this is the important line.
        return factory
    }

注册的MessageConvertersQueueMessageHandler.initArgumentResolvers()里面用作PayloadArgumentResolvers

因此,这是一个侵入性较小的更改。