RabbitTemplate 扩展 DirectReplyToMessageListenerContainer

RabbitTemplate extending DirectReplyToMessageListenerContainer

我想使用 RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer, but RabbitTemplate seems to instantiate DirectReplyToMessageListenerContainer directly

目前,当2条具有相同correlation-id的消息到达amq.rabbitmq.reply-to时抛出异常:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at com.rabbitmq.client.impl.ConsumerDispatcher.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    ... 10 common frames omitted

使用 sendAndReceive 方法时是否有不同的方式(如果有)接收多个响应?

它不是为了那样做而设计的;您将需要使用 RabbitTemplate.send() 操作和独立的侦听器容器,并且您将关联代码中的答复。

编辑

这是实现它的一种方法(只要您知道期望有多少回复)...

@SpringBootApplication
public class So53206036Application {

    public static void main(String[] args) {
        SpringApplication.run(So53206036Application.class, args);
    }

    @Bean
    public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
        MultiReplyTemplate template = new MultiReplyTemplate();
        template.setConnectionFactory(cf);
        template.setMessageConverter(listConverter());
        return template;
    }

    @Bean
    public ListConverter listConverter() {
        return new ListConverter(new SimpleMessageConverter());
    }

    @RabbitListener(queues = "foo")
    public String listen1(String in) {
        return in.toUpperCase();
    }

    @RabbitListener(queues = "bar")
    public String listen2(String in) {
        return in + in;
    }

    @Bean
    public ApplicationRunner runner(MultiReplyTemplate template) {
        return args -> {
            List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
                    new ParameterizedTypeReference<List<String>>() { });
            System.out.println(reply);
        };
    }

}

class MultiReplyTemplate extends RabbitTemplate {

    private static final byte[] NOBODY = new byte[0];

    private final Map<String, Message> replies = new HashMap<>();

    @Override
    public void onMessage(Message message) {
        // Not thread-safe but that's ok since the DRTMLC is single-threadded.
        String corr = message.getMessageProperties().getCorrelationId();
        Message combined = this.replies.get(corr);
        if (combined == null) {
            combined = new Message(NOBODY, new MessageProperties());
            combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
            this.replies.put(corr, combined);
        }
        @SuppressWarnings("unchecked")
        List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
        list.add(message);
        if (list.size() == 2) {
            this.replies.remove(corr);
            combined.getMessageProperties().setCorrelationId(corr);
            super.onMessage(combined);
        }
    }

}

class ListConverter implements SmartMessageConverter {

    private final MessageConverter delegate;

    ListConverter(MessageConverter delegate) {
        this.delegate = delegate;
    }

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return this.delegate.toMessage(object, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return this.delegate.fromMessage(message); // for listeners
    }

    @Override
    public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
        @SuppressWarnings({ "unchecked" })
        List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
        return list.stream()
                .map(m -> this.delegate.fromMessage(m))
                .collect(Collectors.toList());
    }

}

[FOO, foofoo]