当队列被 Shoveled 时,convertSendAndReceive 不工作

convertSendAndReceive is not working when the queue is Shoveled

仅供参考,我是 RabbitMQ 的新手。

我的应用程序有这个用例,我正在尝试使用 RabbitMQ:

对于上述场景,我使用了 convertSendAndReceive,当生产者和消费者都在同一个 RabbitMQ 服务器中时,它就像一个魅力。但是当队列被 Shoveled 时同样不起作用。

如果我用错了请告诉我method/configuration w.r.t RabbitMQ。

提前致谢。

添加代码

消费者

public static void main(String[] args) throws InterruptedException {
       ConnectionFactory cf = new CachingConnectionFactory("10.223.19.89");        

        // set up the queue, exchange, binding on the broker
        RabbitAdmin admin = new RabbitAdmin(cf);
        Queue queue = new Queue("myQueue");
        Queue queueReply = new Queue("myQueue_reply");

        admin.declareQueue(queue);
        admin.declareQueue(queueReply);
        TopicExchange exchange = new TopicExchange("myExchange");

        admin.declareExchange(exchange);
        admin.declareBinding(
            BindingBuilder.bind(queue).to(exchange).with("foo.*"));
        admin.declareBinding(
                BindingBuilder.bind(queueReply).to(exchange).with("foo.*"));

        SimpleMessageListenerContainer container =
                new SimpleMessageListenerContainer(cf);
        Object listener = new Object() {
            public String handleMessage(String foo) {            
                return foo + "test";
            }
        };

        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);

        container.setMessageListener(adapter);          
        container.setQueueNames("myQueue");         
        container.start();

}

制作人

public void run()
{  

    Thread t = Thread.currentThread();
    ConnectionFactory cf = new CachingConnectionFactory("10.223.19.93");

    RabbitTemplate template = new RabbitTemplate(cf);
    template.setExchange("myExchange");
    template.setRoutingKey("foo.bar");
    Queue queueReply = new Queue("myQueue_reply");
    template.setReplyQueue(queueReply);     

    Object test = template.convertSendAndReceive("Hello world");
    System.out.println(test.toString());

}

public static void main(String[] args) throws InterruptedException {
    for(int i=0; i< 5; i++)
    {
        Thread t = new Thread(new SendReceiveThread());
        t.setName("Thread # " + i);
        t.start();      
        Thread.sleep(100);
    }

}

最好的猜测是你需要使用 named reply queue 并铲它。

对于这种情况,您需要 <reply-listener/>

在rabbitmq 3.4之前,一个临时队列用于回复; direct reply-to 用于 3.4 及更高版本,但我的猜测是 rabbit 不会铲掉为此创建的伪队列。

编辑:

当使用固定回复队列并以编程方式创建 rabbit 模板时,您必须连接一个侦听器容器并将模板设置为侦听器。参见 the documentation

If you define your RabbitTemplate as a <bean/>, or using an @Configuration class to define it as an @Bean, or when creating the template programmatically, you will need to define and wire up the reply listener container yourself. If you fail to do this, the template will never receive the replies and will eventually time out and return null as the reply to a call to a sendAndReceive method.

在spring-rabbit-1.4中,您只需像这样配置消费者: {rabbit:template id="tutorialTemplate" connection-factory="connectionFactory" exchange="TRUST-EXCHANGE" routing-key="my.#" }

最重要的是您必须删除 "replyQueue" 配置元素。在这种情况下,您使用 spring 默认值 "reply-to",它位于 https://www.rabbitmq.com/tutorials/tutorial-six-java.html.