Sending-receiving collection 在 Spring 引导中通过 RabbitMQ

Sending-receiving collection over RabbitMQ in Spring Boot

假定配置,应用在发送端和接收端:

@Configuration
@EnableRabbit
public class EventsConfig {
    @Bean
    public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        final var rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
        return rabbitTemplate;
    }
}

还假设在接收端有一个简单的 Spring Boot RabbitMQ 侦听器,声明如下:

    @RabbitListener(queues = "${amqp.inbox}")
    public void listen(org.springframework.amqp.core.Message message) {
        ...
    }

尝试接收 List<TaskAssignment> 的 collection(其中 TaskAssignment 是一个具有两个 UUID 的简单 POJO),由 rabbit 模板发送,在接收方结束,异常:

accounting-service_1  | 01:26:39.610 WARN  [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1  | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1  | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1  |     at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1  | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1  |     ... 6 common frames omitted
accounting-service_1  | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1  |     at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1  |     ... 11 common frames omitted
accounting-service_1  | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1  |  at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1  |     at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)

Jackson 的 Vavr 模块安装在 ObjectMapper,所以根本原因不是 collection 错误和不可序列化。

堆栈跟踪值得注意的是 headers 部分:

headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}

我的理论是:由于侦听器已收到一个 JSON 数组,并且 Jackson 需要提供显式 TypeReference 以推导一个 POJO,反序列化数组的项目,这就是Jackson2MessageConverter 发生故障原因。没有人给它内容类型。

问题是 - 如何向 Jackson2MessageConverter 提供此类信息? RabbitTemplate好像也没给什么办法。 __ContentTypeId__ 在发件人端的显式设置也无济于事。

或者如何至少克服接收端的 Jackson 反序列化并仅收听 @RabbitListener 中具有 byte[] 有效负载的原始消息?

UPD:制作了一个重现问题的项目:https://www.dropbox.com/s/rde9u02mxob189w/proba_amqp.zip?dl=0

看看这个示例是否对您有所帮助:

https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json

文档在这里:https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter

更新

看看如果在转换器上使用此选项会有什么不同:

/**
 * When false (default), fall back to type id headers if the type (or contents of a container
 * type) is abstract. Set to true if conversion should always be attempted - perhaps because
 * a custom deserializer has been configured on the {@link ObjectMapper}. If the attempt fails,
 * fall back to headers.
 * @param alwaysAttemptConversion true to attempt.
 * @since 2.2.8
 */
public void setAlwaysConvertToInferredType(boolean alwaysAttemptConversion) {

您正在尝试接收原始的、未转换的 Message

尝试 listen(List<Foo> foos),这样我们就可以将推断的类型传递给转换器。

编辑

问题是您的集合对象与 Jackson 不兼容。

这个有效

    @PostConstruct
    public void init() {
        List<POJO> payload = new ArrayList<>();
        payload.add(new POJO(1,2));
        payload.add(new POJO(3,4));
        //POJO payload = new POJO(1,2);
        template.convertAndSend(
            outbox,
            "",
            payload,
            message -> {
                var props = message.getMessageProperties();
                props.setMessageId(UUID.randomUUID().toString());
                return message;
            }
        );
    }

    @RabbitListener(queues = "${amqp.inbox}")
    public void listen(List<POJO> pojos) {
        System.out.println("Message reached:" + pojos);
    }
Message reached:[com.skapral.POJO@7af8fa38, com.skapral.POJO@5569d76c]

来自作者: 这个答案给了我一个关于根本原因的提示。事实证明 vavr-jackson - 支持 Vavr 集合的 Jackson 模块存在错误。

https://github.com/vavr-io/vavr-jackson/issues/189