Spring AMQP RabbitMQ InboundChannelAdapter 和 OutboundEndpoint 不同 Serialization/Deserialization 策略

Spring AMQP RabbitMQ InboundChannelAdapter and OutboundEndpoint Different Serialization/Deserialization Strategy

我有一个class:

public class SomeClass implements Serializable {
    private String name;

    public static SomeClass valueOf(String value) {
        // Here validate and return SomeClass
    }
}

Serializers/Deserializers 为:

public class SomeClassSerializer extends JsonSerializer<SomeClass> {

    @Override
    public void serialize(SomeClass someClass, JsonGenerator generator, SerializerProvider provider) throws IOException {
        generator.writeString(someClass.getName());
    }
}

public class SomeClassDeserializer extends JsonDeserializer<SomeClass> {

    @Override
    public SomeClass deserialize(JsonParser parser, DeserializationContext context) throws IOException {
        final String name = ((TextNode) parser.getCodec().readTree(parser)).textValue();
        return SomeClass.valueOf(name);
    }
}

在我的 Jackson2ObjectMapperBuilder 配置中:

builder.serializerByType(SomeClass.class, new SomeClassSerializer());
builder.deserializerByType(SomeClass.class, new SomeClassDeserializer());

SomeClassPOST/GET 请求的正文中时,一切都会顺利进行。但是,通过 Spring 集成,我有:

return Amqp
           .inboundAdapter(container)
           .outputChannel(someClassChannel)
           .messageConverter(new Jackson2JsonMessageConverter(objectMapperBuilder.build()))
           .get();

对于出站:

@ServiceActivator(inputChannel = "someChannel")
public AmqpOutboundEndpoint someOutboundEndpoint(RabbitTemplate rabbitTemplate,
                                                               FanoutExchange exchange) {
    return Amqp
               .outboundAdapter(rabbitTemplate)
               .exchangeName(exchange.getName())
               .get();
}

使用我的 RabbitTemplate 配置:

@Bean
public RabbitTemplate rabbitTemplate(Jackson2ObjectMapperBuilder objectMapperBuilder){
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapperBuilder
            .serializerByType(SomeClass.class, new SomeClassSerializer())
            .build()));
    return rabbitTemplate;
}

还有我的Gateway

@Gateway(requestChannel = "someChannel")
void publishEvent(@Header(value = "someClass") SomeClass someClass, @Payload Object payload);

当我发送消息时,我在 SomeClassDeserializer 调用 SomeClassvalueOf 方法时遇到异常(我抛出 InvalidArgumentException):

完整堆栈跟踪:

org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@43a251c4]; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [@org.springframework.messaging.handler.annotation.Header com.company.project.domain.package.SomeClass] for value 'com.company.project.domain.package.SomeClass@ec7887e4'; nested exception is java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object, failedMessage=GenericMessage [payload=com.company.project.security.domain.client.Client@55095ce3, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=com.company, amqp_deliveryTag=1, amqp_consumerQueue=com.company.web.192.168.1.34, amqp_redelivered=false, amqp_contentEncoding=UTF-8, json__TypeId__=com.company.project.security.domain.client.Client, amqp_timestamp=Fri Oct 30 12:13:00 EET 2020, amqp_messageId=1dffe404-2235-4f79-4a14-315c896c3c4e, id=bcaa71d5-b454-e65c-6939-b9f07b7cc47b, event=com.company.project.domain.package.SomeClass@ec7887e4, amqp_consumerTag=amq.ctag-ImFjO9LBGoks9Lm3E0EkbQ, contentType=application/json, __TypeId__=com.company.project.security.domain.client.Client, timestamp=1604049180277}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.access[=19=]0(BroadcastingDispatcher.java:56)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.run(BroadcastingDispatcher.java:204)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=19=](ErrorHandlingTaskExecutor.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [@org.springframework.messaging.handler.annotation.Header com.company.project.domain.package.SomeClass] for value 'com.company.project.domain.package.SomeClass@ec7887e4'; nested exception is java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object
    at org.springframework.core.convert.support.ObjectToObjectConverter.convert(ObjectToObjectConverter.java:112)
    at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41)
    at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:111)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1096)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:580)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108)
    ... 10 more
Caused by: java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object
    at com.company.project.domain.package.SomeClass.valueOf(SomeClass.java:148)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.core.convert.support.ObjectToObjectConverter.convert(ObjectToObjectConverter.java:102)

通过尝试使用 valueOf 方法反序列化 SomeClass,我发现反序列化器使用的是正确的反序列化器。但是,它试图反序列化的值 com.company.project.SomeClass@ec7887e4 不是我提供的序列化程序的结果(尽管我在 RabbitTemplate 配置中向 Jackson2ObjectMapperBuilder 提供了所需的序列化程序)。可能是错误,还是我做错了什么?

更新:

这是另一个意外行为:

我暂时将SomeClassDeserializer切换为:

public class SomeClassDeserializer extends JsonDeserializer<SomeClass> {

    @Override
    public SomeClass deserialize(JsonParser parser, DeserializationContext context) throws IOException {
        final String name = ((TextNode) parser.getCodec().readTree(parser)).textValue();
        return new SomeClass(name);
    }
}

SomeClass 上的 valueOf 方法继续调用而不是新 SomeClassDeserializer 上的 new SomeClass(name)。这是我将静态方法的名称从 valueOf 更改为 valueOfName 的那一刻,我的实际新实现开始被调用。

由于SomeClass的构造函数没有验证,异常发生了,虽然这次它开始反序列化为

new SomeClass("com.company.project.domain.package.SomeClass@ec7887e4")

而不是

new SomeClass("myActualValue")

Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass

看起来只是将 class 名称作为字符串发送。

com.company.project.domain.package.SomeClass@ec7887e4

这是 Object 上的默认 toString() 实现。

SomeClass.getName() 而不是 someClass.getName()(假设 class.

上有一个 getName() 方法

编辑

我没注意到它是 header,抱歉。

消息转换器只对负载进行转换。

对于 headers,您需要实现自定义 HeaderMapper - Spring 只需将 headers 传递给 amqp-client,它会执行 toString() 在它不知道的类型上。

Subclass DefaultAmqpHeaderMapper 并覆盖出站端的 populateUserDefinedHeader 和入站端的 extractUserDefinedHeaders