Spring AMQP RabbitMQ InboundChannelAdapter 和 OutboundEndpoint 不同 Serialization/Deserialization 策略
Spring AMQP RabbitMQ InboundChannelAdapter and OutboundEndpoint Different Serialization/Deserialization Strategy
我有一个class:
public class SomeClass implements Serializable {
private String name;
public static SomeClass valueOf(String value) {
// Here validate and return SomeClass
}
}
Serializers/Deserializers 为:
public class SomeClassSerializer extends JsonSerializer<SomeClass> {
@Override
public void serialize(SomeClass someClass, JsonGenerator generator, SerializerProvider provider) throws IOException {
generator.writeString(someClass.getName());
}
}
public class SomeClassDeserializer extends JsonDeserializer<SomeClass> {
@Override
public SomeClass deserialize(JsonParser parser, DeserializationContext context) throws IOException {
final String name = ((TextNode) parser.getCodec().readTree(parser)).textValue();
return SomeClass.valueOf(name);
}
}
在我的 Jackson2ObjectMapperBuilder
配置中:
builder.serializerByType(SomeClass.class, new SomeClassSerializer());
builder.deserializerByType(SomeClass.class, new SomeClassDeserializer());
当 SomeClass
在 POST
/GET
请求的正文中时,一切都会顺利进行。但是,通过 Spring 集成,我有:
return Amqp
.inboundAdapter(container)
.outputChannel(someClassChannel)
.messageConverter(new Jackson2JsonMessageConverter(objectMapperBuilder.build()))
.get();
对于出站:
@ServiceActivator(inputChannel = "someChannel")
public AmqpOutboundEndpoint someOutboundEndpoint(RabbitTemplate rabbitTemplate,
FanoutExchange exchange) {
return Amqp
.outboundAdapter(rabbitTemplate)
.exchangeName(exchange.getName())
.get();
}
使用我的 RabbitTemplate
配置:
@Bean
public RabbitTemplate rabbitTemplate(Jackson2ObjectMapperBuilder objectMapperBuilder){
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapperBuilder
.serializerByType(SomeClass.class, new SomeClassSerializer())
.build()));
return rabbitTemplate;
}
还有我的Gateway
:
@Gateway(requestChannel = "someChannel")
void publishEvent(@Header(value = "someClass") SomeClass someClass, @Payload Object payload);
当我发送消息时,我在 SomeClassDeserializer
调用 SomeClass
的 valueOf
方法时遇到异常(我抛出 InvalidArgumentException
):
完整堆栈跟踪:
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@43a251c4]; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [@org.springframework.messaging.handler.annotation.Header com.company.project.domain.package.SomeClass] for value 'com.company.project.domain.package.SomeClass@ec7887e4'; nested exception is java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object, failedMessage=GenericMessage [payload=com.company.project.security.domain.client.Client@55095ce3, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=com.company, amqp_deliveryTag=1, amqp_consumerQueue=com.company.web.192.168.1.34, amqp_redelivered=false, amqp_contentEncoding=UTF-8, json__TypeId__=com.company.project.security.domain.client.Client, amqp_timestamp=Fri Oct 30 12:13:00 EET 2020, amqp_messageId=1dffe404-2235-4f79-4a14-315c896c3c4e, id=bcaa71d5-b454-e65c-6939-b9f07b7cc47b, event=com.company.project.domain.package.SomeClass@ec7887e4, amqp_consumerTag=amq.ctag-ImFjO9LBGoks9Lm3E0EkbQ, contentType=application/json, __TypeId__=com.company.project.security.domain.client.Client, timestamp=1604049180277}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.access[=19=]0(BroadcastingDispatcher.java:56)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.run(BroadcastingDispatcher.java:204)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=19=](ErrorHandlingTaskExecutor.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [@org.springframework.messaging.handler.annotation.Header com.company.project.domain.package.SomeClass] for value 'com.company.project.domain.package.SomeClass@ec7887e4'; nested exception is java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object
at org.springframework.core.convert.support.ObjectToObjectConverter.convert(ObjectToObjectConverter.java:112)
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:111)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1096)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:580)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108)
... 10 more
Caused by: java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object
at com.company.project.domain.package.SomeClass.valueOf(SomeClass.java:148)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.core.convert.support.ObjectToObjectConverter.convert(ObjectToObjectConverter.java:102)
通过尝试使用 valueOf
方法反序列化 SomeClass
,我发现反序列化器使用的是正确的反序列化器。但是,它试图反序列化的值 com.company.project.SomeClass@ec7887e4
不是我提供的序列化程序的结果(尽管我在 RabbitTemplate
配置中向 Jackson2ObjectMapperBuilder
提供了所需的序列化程序)。可能是错误,还是我做错了什么?
更新:
这是另一个意外行为:
我暂时将SomeClassDeserializer
切换为:
public class SomeClassDeserializer extends JsonDeserializer<SomeClass> {
@Override
public SomeClass deserialize(JsonParser parser, DeserializationContext context) throws IOException {
final String name = ((TextNode) parser.getCodec().readTree(parser)).textValue();
return new SomeClass(name);
}
}
和 SomeClass
上的 valueOf
方法继续调用而不是新 SomeClassDeserializer
上的 new SomeClass(name)
。这是我将静态方法的名称从 valueOf
更改为 valueOfName
的那一刻,我的实际新实现开始被调用。
由于SomeClass
的构造函数没有验证,异常发生了,虽然这次它开始反序列化为
new SomeClass("com.company.project.domain.package.SomeClass@ec7887e4")
而不是
new SomeClass("myActualValue")
Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass
看起来只是将 class 名称作为字符串发送。
com.company.project.domain.package.SomeClass@ec7887e4
这是 Object
上的默认 toString()
实现。
即SomeClass.getName()
而不是 someClass.getName()
(假设 class.
上有一个 getName()
方法
编辑
我没注意到它是 header,抱歉。
消息转换器只对负载进行转换。
对于 headers,您需要实现自定义 HeaderMapper
- Spring 只需将 headers 传递给 amqp-client,它会执行 toString()
在它不知道的类型上。
Subclass DefaultAmqpHeaderMapper
并覆盖出站端的 populateUserDefinedHeader
和入站端的 extractUserDefinedHeaders
。
我有一个class:
public class SomeClass implements Serializable {
private String name;
public static SomeClass valueOf(String value) {
// Here validate and return SomeClass
}
}
Serializers/Deserializers 为:
public class SomeClassSerializer extends JsonSerializer<SomeClass> {
@Override
public void serialize(SomeClass someClass, JsonGenerator generator, SerializerProvider provider) throws IOException {
generator.writeString(someClass.getName());
}
}
public class SomeClassDeserializer extends JsonDeserializer<SomeClass> {
@Override
public SomeClass deserialize(JsonParser parser, DeserializationContext context) throws IOException {
final String name = ((TextNode) parser.getCodec().readTree(parser)).textValue();
return SomeClass.valueOf(name);
}
}
在我的 Jackson2ObjectMapperBuilder
配置中:
builder.serializerByType(SomeClass.class, new SomeClassSerializer());
builder.deserializerByType(SomeClass.class, new SomeClassDeserializer());
当 SomeClass
在 POST
/GET
请求的正文中时,一切都会顺利进行。但是,通过 Spring 集成,我有:
return Amqp
.inboundAdapter(container)
.outputChannel(someClassChannel)
.messageConverter(new Jackson2JsonMessageConverter(objectMapperBuilder.build()))
.get();
对于出站:
@ServiceActivator(inputChannel = "someChannel")
public AmqpOutboundEndpoint someOutboundEndpoint(RabbitTemplate rabbitTemplate,
FanoutExchange exchange) {
return Amqp
.outboundAdapter(rabbitTemplate)
.exchangeName(exchange.getName())
.get();
}
使用我的 RabbitTemplate
配置:
@Bean
public RabbitTemplate rabbitTemplate(Jackson2ObjectMapperBuilder objectMapperBuilder){
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapperBuilder
.serializerByType(SomeClass.class, new SomeClassSerializer())
.build()));
return rabbitTemplate;
}
还有我的Gateway
:
@Gateway(requestChannel = "someChannel")
void publishEvent(@Header(value = "someClass") SomeClass someClass, @Payload Object payload);
当我发送消息时,我在 SomeClassDeserializer
调用 SomeClass
的 valueOf
方法时遇到异常(我抛出 InvalidArgumentException
):
完整堆栈跟踪:
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@43a251c4]; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [@org.springframework.messaging.handler.annotation.Header com.company.project.domain.package.SomeClass] for value 'com.company.project.domain.package.SomeClass@ec7887e4'; nested exception is java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object, failedMessage=GenericMessage [payload=com.company.project.security.domain.client.Client@55095ce3, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=com.company, amqp_deliveryTag=1, amqp_consumerQueue=com.company.web.192.168.1.34, amqp_redelivered=false, amqp_contentEncoding=UTF-8, json__TypeId__=com.company.project.security.domain.client.Client, amqp_timestamp=Fri Oct 30 12:13:00 EET 2020, amqp_messageId=1dffe404-2235-4f79-4a14-315c896c3c4e, id=bcaa71d5-b454-e65c-6939-b9f07b7cc47b, event=com.company.project.domain.package.SomeClass@ec7887e4, amqp_consumerTag=amq.ctag-ImFjO9LBGoks9Lm3E0EkbQ, contentType=application/json, __TypeId__=com.company.project.security.domain.client.Client, timestamp=1604049180277}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.access[=19=]0(BroadcastingDispatcher.java:56)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.run(BroadcastingDispatcher.java:204)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=19=](ErrorHandlingTaskExecutor.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [@org.springframework.messaging.handler.annotation.Header com.company.project.domain.package.SomeClass] for value 'com.company.project.domain.package.SomeClass@ec7887e4'; nested exception is java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object
at org.springframework.core.convert.support.ObjectToObjectConverter.convert(ObjectToObjectConverter.java:112)
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:111)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1096)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:580)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108)
... 10 more
Caused by: java.lang.IllegalArgumentException: Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass object
at com.company.project.domain.package.SomeClass.valueOf(SomeClass.java:148)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.core.convert.support.ObjectToObjectConverter.convert(ObjectToObjectConverter.java:102)
通过尝试使用 valueOf
方法反序列化 SomeClass
,我发现反序列化器使用的是正确的反序列化器。但是,它试图反序列化的值 com.company.project.SomeClass@ec7887e4
不是我提供的序列化程序的结果(尽管我在 RabbitTemplate
配置中向 Jackson2ObjectMapperBuilder
提供了所需的序列化程序)。可能是错误,还是我做错了什么?
更新:
这是另一个意外行为:
我暂时将SomeClassDeserializer
切换为:
public class SomeClassDeserializer extends JsonDeserializer<SomeClass> {
@Override
public SomeClass deserialize(JsonParser parser, DeserializationContext context) throws IOException {
final String name = ((TextNode) parser.getCodec().readTree(parser)).textValue();
return new SomeClass(name);
}
}
和 SomeClass
上的 valueOf
方法继续调用而不是新 SomeClassDeserializer
上的 new SomeClass(name)
。这是我将静态方法的名称从 valueOf
更改为 valueOfName
的那一刻,我的实际新实现开始被调用。
由于SomeClass
的构造函数没有验证,异常发生了,虽然这次它开始反序列化为
new SomeClass("com.company.project.domain.package.SomeClass@ec7887e4")
而不是
new SomeClass("myActualValue")
Couldn't convert value : com.company.project.domain.package.SomeClass@ec7887e4 into a valid com.company.project.domain.package.SomeClass
看起来只是将 class 名称作为字符串发送。
com.company.project.domain.package.SomeClass@ec7887e4
这是 Object
上的默认 toString()
实现。
即SomeClass.getName()
而不是 someClass.getName()
(假设 class.
getName()
方法
编辑
我没注意到它是 header,抱歉。
消息转换器只对负载进行转换。
对于 headers,您需要实现自定义 HeaderMapper
- Spring 只需将 headers 传递给 amqp-client,它会执行 toString()
在它不知道的类型上。
Subclass DefaultAmqpHeaderMapper
并覆盖出站端的 populateUserDefinedHeader
和入站端的 extractUserDefinedHeaders
。