固定回复队列在第二条消息后超时

Fixed reply queue times out after second message

我正在尝试实现一个 RabbitMQ 配置,它允许我使用一个固定的回复队列,而不是出现数百个临时队列。 我发布的第一条消息通过回复队列得到立即响应,第二条、第三条甚至有时甚至是第五条消息,只给我一个堆栈跟踪,上面写着 Reply received after timeout。如果我稍等一下,然后发送另一条消息,我会再次收到响应,任何立即连续的消息再次失败并出现相同的错误。

在发布者方面,我有以下配置:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener />
</rabbit:template>

<rabbit:queue id="reply" name="reply" />

在消费者方面,我有以下配置:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>

<!-- Register Queue Listener Beans -->
<rabbit:listener-container
        connection-factory="connectionFactory"
        channel-transacted="true"
        requeue-rejected="true"
        concurrency="${rabbit.consumers}">
    <rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>

<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />

我正在使用 spring-amqp 1.4.4 以防有任何用处:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.4.RELEASE</version>
    </dependency>

这就是我构建和发布消息的方式:

MessageProperties properties = new MessageProperties();          
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);

Template 只是 AmqpTemplate 的自动装配实例:

@Autowired
AmqpTemplate template;

第一条消息立即得到响应,第二条消息(以及第三条等等)在消费者端获得以下堆栈跟踪:

2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=15=]1(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 more

...而发布者在回复队列中未收到任何响应后就超时了。

这是我在消费者端回复消息的方式:

   @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            ...
            System.out.println(message);
            // handle reply-to
            if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {

                Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
                getTemplate().send("", message.getMessageProperties().getReplyTo(), res);

            }
        } catch (Exception e) {
            e.printStackTrace();
            // TODO: forward to exception queue here
        }    
    }

System.out.println(message); 打印以下内容:

(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])

有什么想法吗?

你有 2 个兔子模板,每个模板都使用相同的 reply 队列 - 所以 "second" 回复被消费者端模板 "received" (因此日志消息因为它当没有等待回复的未完成请求时收到 "reply" - 这在生产者方面结束了)。

请注意,从 rabbitmq 3.4 开始,通常最好使用内置的新 rabbit direct reply-to feature;它通常解决了我们必须实施固定回复队列机制的所有原因。在 Spring AMQP 1.4.1.RELEASE.

中添加了对直接回复的支持

经过几天的纠缠,我对 rabit 模板的 sendAndReceive() 的唯一理解是永远不要干预绑定键,让框架将其设置为队列名称。从这个意义上说它工作得很好,但如果我用我的大脑来设置它,很多事情都会出错。

现在我无法获取 correlation-id,但我收到的 correlation-id 与我发送的不一样。这怎么可能?