Spring 引导和 Spring AMQP RPC - 未找到转换异常的转换器
Spring Boot and Spring AMQP RPC - No converter found to convert exception
我有几个使用 Spring 通过 RabbitMQ 引导和 RPC 的教程。但是,一旦我尝试添加 Jackson JSON 消息转换器,它就会崩溃。
服务器成功接收到远程调用,所以我相信这不是客户端配置。
Exchange DATAFLOW_EXCHANGE
Routing Key dataflowRunner
Redelivered ○
Properties
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority: 0
delivery_mode: 2
headers:
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding: UTF-8
content_type: application/json
Payload
675 bytes
Encoding: string
{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}
但是,输出如下异常:
Caused by: org.springframework.messaging.converter.MessageConversionException:
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted
所以,在交付时,它知道它应该是一个 dw.dataflow.Dataflow 对象,只是找不到转换器。但是,我在任何地方都定义了我的转换器。
服务器配置
@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
ObjectMapper jacksonObjectMapper;
@Bean
public TopicExchange exchange() {
return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}
@Bean
public Queue queue() {
return new Queue("DATAFLOW_QUEUE", true);
}
@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
exporter.setAmqpTemplate(rabbitTemplate());
exporter.setMessageConverter(jackson2JsonMessageConverter());
exporter.setServiceInterface(DataflowRunner.class);
exporter.setService(dataflowRunner());
return exporter ;
}
@Bean
public DataflowRunner dataflowRunner() {
return new DataflowRunnerServerImpl();
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(jacksonObjectMapper);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter());
return template;
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);
return factory;
}
这是服务接口:
public interface DataflowRunner {
String run(Dataflow dataflow) throws Exception;
}
及具体实现:
public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
// SNIP
}
为了笑和咯咯笑,我也尝试用以下注释配置服务器实现class,但它有同样的错误:
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(key = "dataflowRunner",
value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {
客户端配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUser);
connectionFactory.setPassword(rabbitPassword);
connectionFactory.setAddresses(rabbitAddresses);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jackson2MessageConverter());
return template;
}
有什么配置不正确吗?我错过了什么?我在服务导出器和侦听器容器工厂上设置了转换器。
任何帮助and/or 想法表示赞赏。
@RabbitListener
不打算与服务导出器一起使用 - 只是一个普通的 Java class.
对于 Spring RPC 远程处理,服务导出器是 MessageListener
对于 SimpleMessageListenerContainer
。
对于 @RabbitListener
,有一个特殊的监听器适配器包装了 pojo 方法。
所以你似乎混合了两种不同的范式。
ServiceExporter
(Spring 远程处理)预计将与客户端的 AmqpProxyFactoryBean
配对,服务导出器作为服务器端的侦听器。
对于简单的 POJO RPC(比使用 Spring Remoting over RabbitMQ 更新得多),在客户端使用 @RabbitListener
和 RabbitTemplate.convertSendAndReceive()
。摆脱 PFB 和 SE。
您能否解释一下是什么导致您走这条路,以防我们需要对文档进行一些说明。
编辑
如果你想要使用Spring远程处理(在客户端注入一个接口并让它"magically"在服务器端调用一个服务),你需要摆脱所有容器工厂的东西,只需连接一个 SimpleMessageListenerContainer
并将服务导出器注入为 MessageListener
.
参考手册有 an XML example 但您可以将 SMLC 连接为 @Bean
。
EDIT2
我有 运行 一些测试和 Spring 通过 AMQP 的远程处理不适用于 JSON 因为顶级对象是 RemoteInvocation
- 而消息转换器可以重新创建该对象,它没有关于实际参数的类型信息,因此将其保留为链接的哈希映射。
现在,如果您必须使用 JSON,模板 convertSendAndReceive
与 @RabbitListener
结合使用是可行的方法。我将打开一个 JIRA 问题,看看我们是否可以使用 Spring 远程 RPC 和 JSON 解决问题,但它实际上是为 Java 序列化而设计的。
我在这上面花了几分钟,我设法用一个似乎有效的可怕 hack 解决了这个问题。
我基本上扩展了双方调用中涉及的 classes 以确保内部参数和值被转换为 to/from JSON 字符串。
只要多一点爱,这可以改进以使用其他转换器处理其他数据类型,但我没有时间这样做。如果你有足够的勇气尝试一下,我就把它留给你了:-)
在服务器端
首先,我对 AmqpInvokerServiceExporter
进行了子class,以便能够添加对转换 to/from JSON 对象的支持。第一步是将方法参数从 JSON 转换为相应的类型。第二步是将对象中的 returned 值转换为相应的 JSON 字符串以将其发回。
public class JSONAmqpInvokerServiceExporter extends AmqpInvokerServiceExporter {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(Message message) {
Address replyToAddress = message.getMessageProperties().getReplyToAddress();
if (replyToAddress == null) {
throw new AmqpRejectAndDontRequeueException("No replyToAddress in inbound AMQP Message");
}
Object invocationRaw = getMessageConverter().fromMessage(message);
RemoteInvocationResult remoteInvocationResult;
if (invocationRaw == null || !(invocationRaw instanceof RemoteInvocation)) {
remoteInvocationResult = new RemoteInvocationResult(
new IllegalArgumentException("The message does not contain a RemoteInvocation payload"));
}
else {
RemoteInvocation invocation = (RemoteInvocation) invocationRaw;
int argCount = invocation.getArguments().length;
if (argCount > 0) {
Object[] arguments = invocation.getArguments();
Class<?>[] parameterTypes = invocation.getParameterTypes();
for (int i = 0; i < argCount; i++) {
try {
//convert arguments from JSON strings to objects
arguments[i] = objectMapper.readValue(arguments[i].toString(), parameterTypes[i]);
}
catch (IOException cause) {
throw new MessageConversionException(
"Failed to convert JSON to value: " + arguments[i] + " of type" + parameterTypes[i], cause);
}
}
}
remoteInvocationResult = invokeAndCreateResult(invocation, getService());
}
send(remoteInvocationResult, replyToAddress);
}
private void send(RemoteInvocationResult result, Address replyToAddress) {
Object value = result.getValue();
if (value != null) {
try {
//convert the returning value from a model to a JSON string
//before we send it back
Object json = objectMapper.writeValueAsString(value);
result.setValue(json);
}
catch (JsonProcessingException cause) {
throw new MessageConversionException("Failed to convert value to JSON: " + value, cause);
}
}
Message message = getMessageConverter().toMessage(result, new MessageProperties());
getAmqpTemplate().send(replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), message);
}
}
现在,根据这个 class 定义,我将服务侦听器的定义更改为如下内容:
<bean id="toteServiceListener" class="amqphack.FFDAmqpInvokerServiceExporter">
<property name="serviceInterface" value="ampqphack.ToteService"/>
<property name="service" ref="defaultToteService"/>
<property name="amqpTemplate" ref="rabbitTemplate"/>
</bean>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="toteServiceListener" queue-names="tote-service"/>
</rabbit:listener-container>
在这种情况下我使用了常规的 AmqTemplate
,因为我知道 ResultInvocationValue 总是会转换为 JSON 字符串,所以我不介意 InvocationResult 是否使用传统的序列化Java连载。
在客户端
在客户端我不得不改变一些东西。首先,我需要在调用之前将我们发送到的任何参数转换为 JSON 字符串,但我们仍然保留它们的参数类型。幸运的是,现有的 AmqpProxyFactoryBean
接受一个 remoteInvocationFactory
参数,我们可以在其中拦截调用并更改它。所以我首先定义并新建 RemoteInvocationFactory
:
public class JSONRemoteInvocationFactory implements RemoteInvocationFactory {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
RemoteInvocation invocation = new RemoteInvocation(methodInvocation);
if (invocation.getParameterTypes() != null) {
int paramCount = invocation.getParameterTypes().length;
Object[] arguments = new Object[paramCount];
try {
for (int i = 0; i < paramCount; i++) {
arguments[i] = mapper.writeValueAsString(invocation.getArguments()[i]);
}
invocation.setArguments(arguments);
}
catch (JsonProcessingException cause) {
throw new RuntimeException(
"Failed converting arguments to json: " + Arrays.toString(invocation.getArguments()), cause);
}
}
return invocation;
}
}
但这还不够。当我们得到结果时,我们需要将它的结果再次转回一个 Java 对象。为此,我们可以使用预期的 return 类型的服务接口。为此,我扩展了 exist AmqpProxyFactoryBean
以简单地将其结果(我知道它永远是一个字符串)转换为 Java 模型。
public class JSONAmqpProxyFactoryBean extends AmqpProxyFactoryBean {
private final ObjectMapper mapper = DefaultObjectMapper.createDefaultObjectMapper();
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object ret = super.invoke(invocation);
return mapper.readValue(ret.toString(), invocation.getMethod().getReturnType());
}
}
有了这个,我可以像这样定义我的客户端:
<bean id="toteService" class="amqphack.JSONAmqpProxyFactoryBean">
<property name="amqpTemplate" ref="rabbitTemplate"/>
<property name="serviceInterface" value="amqphack.ToteService"/>
<property name="routingKey" value="tote-service"/>
<property name="remoteInvocationFactory" ref="remoteInvocationFactory"/>
</bean>
在此之后,一切都像一个魅力:
ToteService toteService = context.getBean("toteService", ToteService.class);
ToteModel tote = toteService.findTote("18251", "ABCD");
由于我没有更改传统转换器,这意味着异常仍然在 InvocationResult
.
中正确序列化
不知道它是否仍然需要,但这就是我解决使用 JSON 和 AmqpProxyFactoryBean
/ AmqpInvokerServiceExporter
的问题的方法。在客户端我使用 Jackson2JsonMessageConverter
转换器,在服务器端使用 RemoteInvocationAwareMessageConverterAdapter
包装 Jackson2JsonMessageConverter
转换器。
ClientConfig.java
:
import com.stayfriends.commons.services.interfaces.GameService;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.remoting.client.AmqpProxyFactoryBean;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ClientConfig {
@Bean
public RabbitTemplate gameServiceTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange("rpc");
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ServiceAmqpProxyFactoryBean gameServiceProxy2(@Qualifier("gameServiceTemplate") RabbitTemplate template) {
return new ServiceAmqpProxyFactoryBean(template);
}
public static class ServiceAmqpProxyFactoryBean implements FactoryBean<Service>, InitializingBean {
private final AmqpProxyFactoryBean proxy;
ServiceAmqpProxyFactoryBean(RabbitTemplate template) {
proxy = new AmqpProxyFactoryBean();
proxy.setAmqpTemplate(template);
proxy.setServiceInterface(GameService.class);
proxy.setRoutingKey(GameService.class.getSimpleName());
}
@Override
public void afterPropertiesSet() {
proxy.afterPropertiesSet();
}
@Override
public Service getObject() throws Exception {
return (Service) proxy.getObject();
}
@Override
public Class<?> getObjectType() {
return Service.class;
}
@Override
public boolean isSingleton() {
return proxy.isSingleton();
}
}
}
ServerConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter;
import org.springframework.amqp.support.converter.RemoteInvocationAwareMessageConverterAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ServerConfig {
@Bean
public DirectExchange serviceExchange() {
return new DirectExchange("rpc");
}
@Bean
public Queue serviceQueue() {
return new Queue(Service.class.getSimpleName());
}
@Bean
public Binding binding(@Qualifier("serviceQueue") Queue queue, @Qualifier("serviceExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Service.class.getSimpleName()).noargs();
}
@Bean("remoteInvocationAwareMessageConverter")
@Primary
public RemoteInvocationAwareMessageConverterAdapter remoteInvocationAwareMessageConverterAdapter(
Jackson2JsonMessageConverter jsonMessageConverter) {
return new RemoteInvocationAwareMessageConverterAdapter(jsonMessageConverter);
}
@Bean
public AmqpInvokerServiceExporter exporter(RabbitTemplate template, ServiceImpl service,
RemoteInvocationAwareMessageConverterAdapter messageConverter) {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
exporter.setAmqpTemplate(template);
exporter.setService(service);
exporter.setServiceInterface(Service.class);
exporter.setMessageConverter(messageConverter);
return exporter;
}
@Bean
public MessageListenerContainer container(ConnectionFactory connectionFactory,
@Qualifier("serviceQueue") Queue queue,
AmqpInvokerServiceExporter exporter) {
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(exporter);
container.setConsumersPerQueue(5);
return container;
}
}
我有几个使用 Spring 通过 RabbitMQ 引导和 RPC 的教程。但是,一旦我尝试添加 Jackson JSON 消息转换器,它就会崩溃。
服务器成功接收到远程调用,所以我相信这不是客户端配置。
Exchange DATAFLOW_EXCHANGE
Routing Key dataflowRunner
Redelivered ○
Properties
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority: 0
delivery_mode: 2
headers:
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding: UTF-8
content_type: application/json
Payload
675 bytes
Encoding: string
{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}
但是,输出如下异常:
Caused by: org.springframework.messaging.converter.MessageConversionException:
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted
所以,在交付时,它知道它应该是一个 dw.dataflow.Dataflow 对象,只是找不到转换器。但是,我在任何地方都定义了我的转换器。
服务器配置
@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
ObjectMapper jacksonObjectMapper;
@Bean
public TopicExchange exchange() {
return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}
@Bean
public Queue queue() {
return new Queue("DATAFLOW_QUEUE", true);
}
@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
exporter.setAmqpTemplate(rabbitTemplate());
exporter.setMessageConverter(jackson2JsonMessageConverter());
exporter.setServiceInterface(DataflowRunner.class);
exporter.setService(dataflowRunner());
return exporter ;
}
@Bean
public DataflowRunner dataflowRunner() {
return new DataflowRunnerServerImpl();
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(jacksonObjectMapper);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter());
return template;
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);
return factory;
}
这是服务接口:
public interface DataflowRunner {
String run(Dataflow dataflow) throws Exception;
}
及具体实现:
public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
// SNIP
}
为了笑和咯咯笑,我也尝试用以下注释配置服务器实现class,但它有同样的错误:
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(key = "dataflowRunner",
value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {
客户端配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUser);
connectionFactory.setPassword(rabbitPassword);
connectionFactory.setAddresses(rabbitAddresses);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jackson2MessageConverter());
return template;
}
有什么配置不正确吗?我错过了什么?我在服务导出器和侦听器容器工厂上设置了转换器。
任何帮助and/or 想法表示赞赏。
@RabbitListener
不打算与服务导出器一起使用 - 只是一个普通的 Java class.
对于 Spring RPC 远程处理,服务导出器是 MessageListener
对于 SimpleMessageListenerContainer
。
对于 @RabbitListener
,有一个特殊的监听器适配器包装了 pojo 方法。
所以你似乎混合了两种不同的范式。
ServiceExporter
(Spring 远程处理)预计将与客户端的 AmqpProxyFactoryBean
配对,服务导出器作为服务器端的侦听器。
对于简单的 POJO RPC(比使用 Spring Remoting over RabbitMQ 更新得多),在客户端使用 @RabbitListener
和 RabbitTemplate.convertSendAndReceive()
。摆脱 PFB 和 SE。
您能否解释一下是什么导致您走这条路,以防我们需要对文档进行一些说明。
编辑
如果你想要使用Spring远程处理(在客户端注入一个接口并让它"magically"在服务器端调用一个服务),你需要摆脱所有容器工厂的东西,只需连接一个 SimpleMessageListenerContainer
并将服务导出器注入为 MessageListener
.
参考手册有 an XML example 但您可以将 SMLC 连接为 @Bean
。
EDIT2
我有 运行 一些测试和 Spring 通过 AMQP 的远程处理不适用于 JSON 因为顶级对象是 RemoteInvocation
- 而消息转换器可以重新创建该对象,它没有关于实际参数的类型信息,因此将其保留为链接的哈希映射。
现在,如果您必须使用 JSON,模板 convertSendAndReceive
与 @RabbitListener
结合使用是可行的方法。我将打开一个 JIRA 问题,看看我们是否可以使用 Spring 远程 RPC 和 JSON 解决问题,但它实际上是为 Java 序列化而设计的。
我在这上面花了几分钟,我设法用一个似乎有效的可怕 hack 解决了这个问题。
我基本上扩展了双方调用中涉及的 classes 以确保内部参数和值被转换为 to/from JSON 字符串。
只要多一点爱,这可以改进以使用其他转换器处理其他数据类型,但我没有时间这样做。如果你有足够的勇气尝试一下,我就把它留给你了:-)
在服务器端
首先,我对 AmqpInvokerServiceExporter
进行了子class,以便能够添加对转换 to/from JSON 对象的支持。第一步是将方法参数从 JSON 转换为相应的类型。第二步是将对象中的 returned 值转换为相应的 JSON 字符串以将其发回。
public class JSONAmqpInvokerServiceExporter extends AmqpInvokerServiceExporter {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(Message message) {
Address replyToAddress = message.getMessageProperties().getReplyToAddress();
if (replyToAddress == null) {
throw new AmqpRejectAndDontRequeueException("No replyToAddress in inbound AMQP Message");
}
Object invocationRaw = getMessageConverter().fromMessage(message);
RemoteInvocationResult remoteInvocationResult;
if (invocationRaw == null || !(invocationRaw instanceof RemoteInvocation)) {
remoteInvocationResult = new RemoteInvocationResult(
new IllegalArgumentException("The message does not contain a RemoteInvocation payload"));
}
else {
RemoteInvocation invocation = (RemoteInvocation) invocationRaw;
int argCount = invocation.getArguments().length;
if (argCount > 0) {
Object[] arguments = invocation.getArguments();
Class<?>[] parameterTypes = invocation.getParameterTypes();
for (int i = 0; i < argCount; i++) {
try {
//convert arguments from JSON strings to objects
arguments[i] = objectMapper.readValue(arguments[i].toString(), parameterTypes[i]);
}
catch (IOException cause) {
throw new MessageConversionException(
"Failed to convert JSON to value: " + arguments[i] + " of type" + parameterTypes[i], cause);
}
}
}
remoteInvocationResult = invokeAndCreateResult(invocation, getService());
}
send(remoteInvocationResult, replyToAddress);
}
private void send(RemoteInvocationResult result, Address replyToAddress) {
Object value = result.getValue();
if (value != null) {
try {
//convert the returning value from a model to a JSON string
//before we send it back
Object json = objectMapper.writeValueAsString(value);
result.setValue(json);
}
catch (JsonProcessingException cause) {
throw new MessageConversionException("Failed to convert value to JSON: " + value, cause);
}
}
Message message = getMessageConverter().toMessage(result, new MessageProperties());
getAmqpTemplate().send(replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), message);
}
}
现在,根据这个 class 定义,我将服务侦听器的定义更改为如下内容:
<bean id="toteServiceListener" class="amqphack.FFDAmqpInvokerServiceExporter">
<property name="serviceInterface" value="ampqphack.ToteService"/>
<property name="service" ref="defaultToteService"/>
<property name="amqpTemplate" ref="rabbitTemplate"/>
</bean>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="toteServiceListener" queue-names="tote-service"/>
</rabbit:listener-container>
在这种情况下我使用了常规的 AmqTemplate
,因为我知道 ResultInvocationValue 总是会转换为 JSON 字符串,所以我不介意 InvocationResult 是否使用传统的序列化Java连载。
在客户端
在客户端我不得不改变一些东西。首先,我需要在调用之前将我们发送到的任何参数转换为 JSON 字符串,但我们仍然保留它们的参数类型。幸运的是,现有的 AmqpProxyFactoryBean
接受一个 remoteInvocationFactory
参数,我们可以在其中拦截调用并更改它。所以我首先定义并新建 RemoteInvocationFactory
:
public class JSONRemoteInvocationFactory implements RemoteInvocationFactory {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
RemoteInvocation invocation = new RemoteInvocation(methodInvocation);
if (invocation.getParameterTypes() != null) {
int paramCount = invocation.getParameterTypes().length;
Object[] arguments = new Object[paramCount];
try {
for (int i = 0; i < paramCount; i++) {
arguments[i] = mapper.writeValueAsString(invocation.getArguments()[i]);
}
invocation.setArguments(arguments);
}
catch (JsonProcessingException cause) {
throw new RuntimeException(
"Failed converting arguments to json: " + Arrays.toString(invocation.getArguments()), cause);
}
}
return invocation;
}
}
但这还不够。当我们得到结果时,我们需要将它的结果再次转回一个 Java 对象。为此,我们可以使用预期的 return 类型的服务接口。为此,我扩展了 exist AmqpProxyFactoryBean
以简单地将其结果(我知道它永远是一个字符串)转换为 Java 模型。
public class JSONAmqpProxyFactoryBean extends AmqpProxyFactoryBean {
private final ObjectMapper mapper = DefaultObjectMapper.createDefaultObjectMapper();
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object ret = super.invoke(invocation);
return mapper.readValue(ret.toString(), invocation.getMethod().getReturnType());
}
}
有了这个,我可以像这样定义我的客户端:
<bean id="toteService" class="amqphack.JSONAmqpProxyFactoryBean">
<property name="amqpTemplate" ref="rabbitTemplate"/>
<property name="serviceInterface" value="amqphack.ToteService"/>
<property name="routingKey" value="tote-service"/>
<property name="remoteInvocationFactory" ref="remoteInvocationFactory"/>
</bean>
在此之后,一切都像一个魅力:
ToteService toteService = context.getBean("toteService", ToteService.class);
ToteModel tote = toteService.findTote("18251", "ABCD");
由于我没有更改传统转换器,这意味着异常仍然在 InvocationResult
.
不知道它是否仍然需要,但这就是我解决使用 JSON 和 AmqpProxyFactoryBean
/ AmqpInvokerServiceExporter
的问题的方法。在客户端我使用 Jackson2JsonMessageConverter
转换器,在服务器端使用 RemoteInvocationAwareMessageConverterAdapter
包装 Jackson2JsonMessageConverter
转换器。
ClientConfig.java
:
import com.stayfriends.commons.services.interfaces.GameService;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.remoting.client.AmqpProxyFactoryBean;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ClientConfig {
@Bean
public RabbitTemplate gameServiceTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange("rpc");
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ServiceAmqpProxyFactoryBean gameServiceProxy2(@Qualifier("gameServiceTemplate") RabbitTemplate template) {
return new ServiceAmqpProxyFactoryBean(template);
}
public static class ServiceAmqpProxyFactoryBean implements FactoryBean<Service>, InitializingBean {
private final AmqpProxyFactoryBean proxy;
ServiceAmqpProxyFactoryBean(RabbitTemplate template) {
proxy = new AmqpProxyFactoryBean();
proxy.setAmqpTemplate(template);
proxy.setServiceInterface(GameService.class);
proxy.setRoutingKey(GameService.class.getSimpleName());
}
@Override
public void afterPropertiesSet() {
proxy.afterPropertiesSet();
}
@Override
public Service getObject() throws Exception {
return (Service) proxy.getObject();
}
@Override
public Class<?> getObjectType() {
return Service.class;
}
@Override
public boolean isSingleton() {
return proxy.isSingleton();
}
}
}
ServerConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter;
import org.springframework.amqp.support.converter.RemoteInvocationAwareMessageConverterAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ServerConfig {
@Bean
public DirectExchange serviceExchange() {
return new DirectExchange("rpc");
}
@Bean
public Queue serviceQueue() {
return new Queue(Service.class.getSimpleName());
}
@Bean
public Binding binding(@Qualifier("serviceQueue") Queue queue, @Qualifier("serviceExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Service.class.getSimpleName()).noargs();
}
@Bean("remoteInvocationAwareMessageConverter")
@Primary
public RemoteInvocationAwareMessageConverterAdapter remoteInvocationAwareMessageConverterAdapter(
Jackson2JsonMessageConverter jsonMessageConverter) {
return new RemoteInvocationAwareMessageConverterAdapter(jsonMessageConverter);
}
@Bean
public AmqpInvokerServiceExporter exporter(RabbitTemplate template, ServiceImpl service,
RemoteInvocationAwareMessageConverterAdapter messageConverter) {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
exporter.setAmqpTemplate(template);
exporter.setService(service);
exporter.setServiceInterface(Service.class);
exporter.setMessageConverter(messageConverter);
return exporter;
}
@Bean
public MessageListenerContainer container(ConnectionFactory connectionFactory,
@Qualifier("serviceQueue") Queue queue,
AmqpInvokerServiceExporter exporter) {
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(exporter);
container.setConsumersPerQueue(5);
return container;
}
}