Sending-receiving collection 在 Spring 引导中通过 RabbitMQ
Sending-receiving collection over RabbitMQ in Spring Boot
假定配置,应用在发送端和接收端:
@Configuration
@EnableRabbit
public class EventsConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
final var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
}
还假设在接收端有一个简单的 Spring Boot RabbitMQ 侦听器,声明如下:
@RabbitListener(queues = "${amqp.inbox}")
public void listen(org.springframework.amqp.core.Message message) {
...
}
尝试接收 List<TaskAssignment>
的 collection(其中 TaskAssignment
是一个具有两个 UUID 的简单 POJO),由 rabbit 模板发送,在接收方结束,异常:
accounting-service_1 | 01:26:39.610 WARN [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1 | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1 | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1 | at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1 | at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1 | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1 | ... 6 common frames omitted
accounting-service_1 | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1 | at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1 | ... 11 common frames omitted
accounting-service_1 | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1 | at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1 | at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
Jackson 的 Vavr 模块安装在 ObjectMapper
,所以根本原因不是 collection 错误和不可序列化。
堆栈跟踪值得注意的是 headers 部分:
headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}
我的理论是:由于侦听器已收到一个 JSON 数组,并且 Jackson 需要提供显式 TypeReference
以推导一个 POJO,反序列化数组的项目,这就是Jackson2MessageConverter
发生故障原因。没有人给它内容类型。
问题是 - 如何向 Jackson2MessageConverter
提供此类信息? RabbitTemplate
好像也没给什么办法。 __ContentTypeId__
在发件人端的显式设置也无济于事。
或者如何至少克服接收端的 Jackson 反序列化并仅收听 @RabbitListener
中具有 byte[]
有效负载的原始消息?
UPD:制作了一个重现问题的项目:https://www.dropbox.com/s/rde9u02mxob189w/proba_amqp.zip?dl=0
看看这个示例是否对您有所帮助:
https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json
文档在这里:https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter
更新
看看如果在转换器上使用此选项会有什么不同:
/**
* When false (default), fall back to type id headers if the type (or contents of a container
* type) is abstract. Set to true if conversion should always be attempted - perhaps because
* a custom deserializer has been configured on the {@link ObjectMapper}. If the attempt fails,
* fall back to headers.
* @param alwaysAttemptConversion true to attempt.
* @since 2.2.8
*/
public void setAlwaysConvertToInferredType(boolean alwaysAttemptConversion) {
您正在尝试接收原始的、未转换的 Message
。
尝试 listen(List<Foo> foos)
,这样我们就可以将推断的类型传递给转换器。
编辑
问题是您的集合对象与 Jackson 不兼容。
这个有效
@PostConstruct
public void init() {
List<POJO> payload = new ArrayList<>();
payload.add(new POJO(1,2));
payload.add(new POJO(3,4));
//POJO payload = new POJO(1,2);
template.convertAndSend(
outbox,
"",
payload,
message -> {
var props = message.getMessageProperties();
props.setMessageId(UUID.randomUUID().toString());
return message;
}
);
}
@RabbitListener(queues = "${amqp.inbox}")
public void listen(List<POJO> pojos) {
System.out.println("Message reached:" + pojos);
}
Message reached:[com.skapral.POJO@7af8fa38, com.skapral.POJO@5569d76c]
来自作者:
这个答案给了我一个关于根本原因的提示。事实证明 vavr-jackson
- 支持 Vavr 集合的 Jackson 模块存在错误。
假定配置,应用在发送端和接收端:
@Configuration
@EnableRabbit
public class EventsConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
final var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
}
还假设在接收端有一个简单的 Spring Boot RabbitMQ 侦听器,声明如下:
@RabbitListener(queues = "${amqp.inbox}")
public void listen(org.springframework.amqp.core.Message message) {
...
}
尝试接收 List<TaskAssignment>
的 collection(其中 TaskAssignment
是一个具有两个 UUID 的简单 POJO),由 rabbit 模板发送,在接收方结束,异常:
accounting-service_1 | 01:26:39.610 WARN [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1 | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1 | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1 | at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1 | at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1 | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1 | ... 6 common frames omitted
accounting-service_1 | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1 | at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1 | ... 11 common frames omitted
accounting-service_1 | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1 | at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1 | at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
Jackson 的 Vavr 模块安装在 ObjectMapper
,所以根本原因不是 collection 错误和不可序列化。
堆栈跟踪值得注意的是 headers 部分:
headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}
我的理论是:由于侦听器已收到一个 JSON 数组,并且 Jackson 需要提供显式 TypeReference
以推导一个 POJO,反序列化数组的项目,这就是Jackson2MessageConverter
发生故障原因。没有人给它内容类型。
问题是 - 如何向 Jackson2MessageConverter
提供此类信息? RabbitTemplate
好像也没给什么办法。 __ContentTypeId__
在发件人端的显式设置也无济于事。
或者如何至少克服接收端的 Jackson 反序列化并仅收听 @RabbitListener
中具有 byte[]
有效负载的原始消息?
UPD:制作了一个重现问题的项目:https://www.dropbox.com/s/rde9u02mxob189w/proba_amqp.zip?dl=0
看看这个示例是否对您有所帮助:
https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json
文档在这里:https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter
更新
看看如果在转换器上使用此选项会有什么不同:
/**
* When false (default), fall back to type id headers if the type (or contents of a container
* type) is abstract. Set to true if conversion should always be attempted - perhaps because
* a custom deserializer has been configured on the {@link ObjectMapper}. If the attempt fails,
* fall back to headers.
* @param alwaysAttemptConversion true to attempt.
* @since 2.2.8
*/
public void setAlwaysConvertToInferredType(boolean alwaysAttemptConversion) {
您正在尝试接收原始的、未转换的 Message
。
尝试 listen(List<Foo> foos)
,这样我们就可以将推断的类型传递给转换器。
编辑
问题是您的集合对象与 Jackson 不兼容。
这个有效
@PostConstruct
public void init() {
List<POJO> payload = new ArrayList<>();
payload.add(new POJO(1,2));
payload.add(new POJO(3,4));
//POJO payload = new POJO(1,2);
template.convertAndSend(
outbox,
"",
payload,
message -> {
var props = message.getMessageProperties();
props.setMessageId(UUID.randomUUID().toString());
return message;
}
);
}
@RabbitListener(queues = "${amqp.inbox}")
public void listen(List<POJO> pojos) {
System.out.println("Message reached:" + pojos);
}
Message reached:[com.skapral.POJO@7af8fa38, com.skapral.POJO@5569d76c]
来自作者:
这个答案给了我一个关于根本原因的提示。事实证明 vavr-jackson
- 支持 Vavr 集合的 Jackson 模块存在错误。