Spring Cloud AWS 设置手动确认 SQS 消息的问题
Spring Cloud AWS Issue with setting manual acknowledge of SQS message
我正在尝试通过使用 spring-cloud-aws-messaging 手动删除 AWS SQS 消息来实现逻辑。此功能已在 this ticket from the example in tests
范围内实施
@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {
LOGGER.info("Received message {}", message.getFoo());
try {
acknowledgment.acknowledge().get();
} catch (InterruptedException e) {
LOGGER.error("Opps", e);
} catch (ExecutionException e) {
LOGGER.error("Opps", e);
}
}
但是遇到了意外的异常
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
org.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
SqsMessageDeletionPolicy.ON_SUCCESS
的解决方案有效,但我想避免抛出异常。
我在配置中遗漏了什么?
它花了一些时间摆弄并尝试了与其他 SO 答案不同的东西。
这是我的代码,我会尽力解释。我包括了我用于 SQS 消费者的所有内容。
我的配置 class 如下。下面唯一需要注意的不太明显的事情是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。 MappingJackson2MessageConverter(以防从非常明显的 class 名称看不明显)class 处理来自 SQS 的有效负载的反序列化。
将严格的内容类型匹配设置为 false 也很重要。
此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式配置它:
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
你可能不想那样做,所以你可以将它留空,它会创建自己的 ObjectMapper。
我认为代码的其余部分是不言自明的...?如果没有请告诉我。
我们的用例之间的一个区别是,您似乎在映射自己的自定义对象 (SqsEventDTO),我认为这可行吗?在那种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。
@Configuration
public class AppConfig {
@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
return queueMessageHandlerFactory.createQueueMessageHandler();
}
@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(sqsClient);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setSerializedPayloadClass(String.class);
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
// Uses the MappingJackson2MessageConverter object to resolve/map
// the payload against the Message/S3EventNotification argument.
PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
// Extract the acknowledgment data from the payload's headers,
// which then gets deserialized into the Acknowledgment object.
AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// I don't remember the specifics of WHY, however there is
// something important about the order of the argument resolvers
// in the list
factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
return factory;
}
@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setWaitTimeOut(20);
smlc.setAmazonSqs(amazonSQSAsync);
smlc.setMessageHandler(queueMessageHandler);
smlc.setBeanName("ConsumerBean");
smlc.setMaxNumberOfMessages(sqsMaxMessages);
smlc.setTaskExecutor(threadPoolExecutor);
return smlc;
}
@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(threadTimeoutSeconds);
executor.setThreadNamePrefix(threadName);
executor.initialize();
return executor;
}
}
下面是我的 SQS 消费者服务 class。
@Service
public class RawConsumer {
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
// Handle event here
}
希望对您有所帮助,如果您有任何问题,请告诉我。
问题作者没有提到的是他试图定制杰克逊ObjectMapper
。因此,他实例化了一个 MappingJackson2MessageConverter
,将其包装在 PayloadArgumentResolver
中并将其设置为 QueueMessageHandlerFactory.setArgumentResolvers()
上的单个 HandlerMethodArgumentResolver
。这样做会覆盖 QueueMessageHandler.initArgumentResolvers()
中定义的默认参数解析器列表(在 QueueMessageHandlerFactory
中创建 QueueMessageHandler
的实例时调用)。
当例如只有 PayloadArgumentResolver
被设置为单参数解析器,Acknowledgement
参数不能再绑定了。
为了自定义 Jackson 消息转换器,比覆盖参数解析器列表更好的解决方案是在 QueueMessageHandlerFactory
:
上设置消息转换器列表
@Bean
fun queueMessageHandlerFactory(objectMapper: ObjectMapper): QueueMessageHandlerFactory {
val factory = QueueMessageHandlerFactory()
val messageConverter = MappingJackson2MessageConverter()
messageConverter.objectMapper = objectMapper
factory.setMessageConverters(listOf(messageConverter)) // <-- this is the important line.
return factory
}
注册的MessageConverters
在QueueMessageHandler.initArgumentResolvers()
里面用作PayloadArgumentResolvers
。
因此,这是一个侵入性较小的更改。
我正在尝试通过使用 spring-cloud-aws-messaging 手动删除 AWS SQS 消息来实现逻辑。此功能已在 this ticket from the example in tests
范围内实施@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {
LOGGER.info("Received message {}", message.getFoo());
try {
acknowledgment.acknowledge().get();
} catch (InterruptedException e) {
LOGGER.error("Opps", e);
} catch (ExecutionException e) {
LOGGER.error("Opps", e);
}
}
但是遇到了意外的异常
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
org.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
SqsMessageDeletionPolicy.ON_SUCCESS
的解决方案有效,但我想避免抛出异常。
我在配置中遗漏了什么?
它花了一些时间摆弄并尝试了与其他 SO 答案不同的东西。
这是我的代码,我会尽力解释。我包括了我用于 SQS 消费者的所有内容。
我的配置 class 如下。下面唯一需要注意的不太明显的事情是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。 MappingJackson2MessageConverter(以防从非常明显的 class 名称看不明显)class 处理来自 SQS 的有效负载的反序列化。
将严格的内容类型匹配设置为 false 也很重要。
此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式配置它:
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
你可能不想那样做,所以你可以将它留空,它会创建自己的 ObjectMapper。
我认为代码的其余部分是不言自明的...?如果没有请告诉我。
我们的用例之间的一个区别是,您似乎在映射自己的自定义对象 (SqsEventDTO),我认为这可行吗?在那种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。
@Configuration
public class AppConfig {
@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
return queueMessageHandlerFactory.createQueueMessageHandler();
}
@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(sqsClient);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setSerializedPayloadClass(String.class);
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
// Uses the MappingJackson2MessageConverter object to resolve/map
// the payload against the Message/S3EventNotification argument.
PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
// Extract the acknowledgment data from the payload's headers,
// which then gets deserialized into the Acknowledgment object.
AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// I don't remember the specifics of WHY, however there is
// something important about the order of the argument resolvers
// in the list
factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
return factory;
}
@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setWaitTimeOut(20);
smlc.setAmazonSqs(amazonSQSAsync);
smlc.setMessageHandler(queueMessageHandler);
smlc.setBeanName("ConsumerBean");
smlc.setMaxNumberOfMessages(sqsMaxMessages);
smlc.setTaskExecutor(threadPoolExecutor);
return smlc;
}
@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(threadTimeoutSeconds);
executor.setThreadNamePrefix(threadName);
executor.initialize();
return executor;
}
}
下面是我的 SQS 消费者服务 class。
@Service
public class RawConsumer {
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
// Handle event here
}
希望对您有所帮助,如果您有任何问题,请告诉我。
问题作者没有提到的是他试图定制杰克逊ObjectMapper
。因此,他实例化了一个 MappingJackson2MessageConverter
,将其包装在 PayloadArgumentResolver
中并将其设置为 QueueMessageHandlerFactory.setArgumentResolvers()
上的单个 HandlerMethodArgumentResolver
。这样做会覆盖 QueueMessageHandler.initArgumentResolvers()
中定义的默认参数解析器列表(在 QueueMessageHandlerFactory
中创建 QueueMessageHandler
的实例时调用)。
当例如只有 PayloadArgumentResolver
被设置为单参数解析器,Acknowledgement
参数不能再绑定了。
为了自定义 Jackson 消息转换器,比覆盖参数解析器列表更好的解决方案是在 QueueMessageHandlerFactory
:
@Bean
fun queueMessageHandlerFactory(objectMapper: ObjectMapper): QueueMessageHandlerFactory {
val factory = QueueMessageHandlerFactory()
val messageConverter = MappingJackson2MessageConverter()
messageConverter.objectMapper = objectMapper
factory.setMessageConverters(listOf(messageConverter)) // <-- this is the important line.
return factory
}
注册的MessageConverters
在QueueMessageHandler.initArgumentResolvers()
里面用作PayloadArgumentResolvers
。
因此,这是一个侵入性较小的更改。