使用多个 RabbitTemplate 对象时超时后收到回复

Reply received after timeout when using multiple RabbitTemplate objects

我有一个 Java Spring Boot 1.5.10 应用程序,我正在使用它连接到两个不同的 RabbitMQ 服务器。一个 RabbitMQ 服务器 运行 在与 Spring 引导应用程序相同的主机上,另一个在 different/remote 主机上。 这个版本的 Spring Boot 包括 org.springframework.amqp:spring-amqp:jar:1.7.6.RELEASE, 顺便说一下。因此,这是我的一些与本地 RabbitMQ 服务器相关的配置代码:

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory(host);
    factory.setVirtualHost(vhost);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setChannelCacheSize(2);

    // Add a custom client connection property, which will show up in the Admin UI (useful for troubleshooting).
    factory.getRabbitConnectionFactory().getClientProperties().put("Connection Type", "Local");

    return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory,
        MessageConverter jackson2JsonMessageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
    return rabbitTemplate;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    return factory;
}

@Bean
public RabbitAdmin admin(ConnectionFactory rabbitConnectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
    rabbitAdmin.afterPropertiesSet();
    rabbitAdmin.setAutoStartup(false);
    return rabbitAdmin;
}

下面是我在另一台主机上远程 运行 RabbitMQ 服务器的一些配置代码:

@Bean
public ConnectionFactory remoteConnectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory(remoteHost);
    factory.setVirtualHost(remoteVhost);
    factory.setUsername(remoteUsername);
    factory.setPassword(remotePassword);

    // By default, only one Channel will be cached, with further requested Channels being created and disposed on demand.
    // Consider raising the "channelCacheSize" value in case of a high-concurrency environment.
    factory.setChannelCacheSize(3);

    factory.setConnectionThreadFactory(new CustomizableThreadFactory("RemoteRabbit-"));

    // Add a custom client connection property, which will show up in the Admin UI (useful for troubleshooting).
    factory.getRabbitConnectionFactory().getClientProperties().put("Connection Type", "Remote");

    return factory;
}

@Bean
public RabbitAdmin remoteAdmin(ConnectionFactory remoteConnectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(remoteConnectionFactory);
    rabbitAdmin.setIgnoreDeclarationExceptions(true);
    return rabbitAdmin;
}

@Bean
public SimpleRabbitListenerContainerFactory remoteContainerFactory(ConnectionFactory remoteConnectionFactory,
        MessageConverter jackson2JsonMessageConverter) {

    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(remoteConnectionFactory);
    factory.setConcurrentConsumers(1);
    factory.setMessageConverter(jackson2JsonMessageConverter);
    factory.setMaxConcurrentConsumers(5);
    factory.setDefaultRequeueRejected(false); // on error, don't put back in the queue
    return factory;
}

精彩部分来了。我有另一个正在调用 convertSendAndReceive() 的 RabbitTemplate,在其上配置了 remoteConnectionFactory。

@Bean
public RabbitTemplate payTemplate(ConnectionFactory remoteConnectionFactory,
        TopicExchange fromExchange, MessageConverter jackson2JsonMessageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(remoteConnectionFactory);
    rabbitTemplate.setReplyAddress(fromExchange.getName() + "/" + buildMyBindingKey());
    rabbitTemplate.setReplyTimeout(30000L);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
    return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer payReplyContainer(ConnectionFactory remoteConnectionFactory,
        RabbitTemplate payTemplate, Queue fromQueue) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(remoteConnectionFactory);
    container.setQueues(fromQueue);
    container.setMessageListener(payTemplate);
    return container;
}

@Bean
public TopicExchange fromExchange(RabbitAdmin remoteAdmin) {
    TopicExchange exchange = new TopicExchange("some.from.exchange", true, false);
    exchange.setAdminsThatShouldDeclare(remoteAdmin);
    return exchange;
}

@Bean
public Queue fromQueue(RabbitAdmin remoteAdmin) {
    Queue queue = new Queue("myReplyQueue");
    queue.setAdminsThatShouldDeclare(corporateAdmin);
    return queue;
}

private String buildMyBindingKey() {
    return "someBindingKeyHere";
}

我在 payTemplate 上调用 convertSendAndReceive() 时出现了问题。回复很好,但是好像收到了两次。这在我只连接到一个 RabbitMQ 服务器时有效,但是在配置到两个服务器的连接后,我得到了这个:

2018-06-11 18:29:57,156|WARN||payReplyContainer-1|org.springframework.amqp.rabbit.core.RabbitTemplate|||||Reply received after timeout for 1

2018-06-11 18:29:57,165|WARN||payReplyContainer-1|org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler|||||Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:949)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:902)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:790)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:105)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:208)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1349)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1292)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1262)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:105)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1518)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
        at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1759)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:899)
        ... 10 common frames omitted

同样,payTemplate 确实得到了它正在等待的 reply/response,但它就像容器收到了另一条没有人在等待的消息。我很难过。任何帮助表示赞赏。

确实是一个交换(一个不应该存在的额外绑定)负责复制响应。非常感谢 Gary 检查了我的代码并且大概没有发现任何错误并提出了一些要看的东西。