Spring RabbitMQ 的 SimpleMessageListenerContainer 因无效消息而中止

Spring SimpleMessageListenerContainer for RabbitMQ is aborting on invalid message

我正在使用 springs SimpleMessageListenerContainer 来使用来自 RabbitMQ 队列的消息。一切正常,但是当无效消息发送到队列时(例如无效 json),侦听器中止,正在关闭 worker 并且不接受任何进一步的消息。

是否可以将其配置为丢弃损坏的消息并继续收听更多消息?

我正在使用 sprint-rabbit-1.6。1.RELEASE.jar

我的配置如下所示:

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageConverter messageConverter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("my.queue");
    container.setMessageListener(listenerAdapter);
    container.setMessageConverter(messageConverter);
    return container;
}

@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
    messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter());
    return messageListenerAdapter;
}

我的监听方法的声明:

   public void processMessage(Map<String, String> message) {

当我发送类似 '"routeId":"7"}' 的消息(损坏的 json)时,我收到异常:

2016-09-02 08:10:35.821  WARN 35841 --- [   container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at     org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted

2016-09-02 08:10:35.828 ERROR 35841 --- [   container-29]     o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing

org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 1 common frames omitted
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted

2016-09-02 08:10:35.833 ERROR 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2016-09-02 08:10:35.833  INFO 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2016-09-02 08:10:35.833  INFO 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

这里抛出了SimpleMessageListenerContainer中的fatal Exception:

catch (ListenerExecutionFailedException ex) {
                    // Continue to process, otherwise re-throw
                    if (ex.getCause() instanceof NoSuchMethodException) {
                        throw new FatalListenerExecutionException("Invalid listener", ex);
                    }
                }

所以如果容器配置了一个不存在的方法,它似乎应该关闭。但是如果消息损坏,它会尝试使用错误的参数类型调用方法,这也会导致 NoSuchMethodException。这意味着任何生产者都可以用一条损坏的消息杀死我的消费者。

感谢任何建议!

有趣;我能够重现您的问题;事实证明,如果消息不包含 __TypeID__ header(转换提示),它只是 returns "bad" json 作为字符串。

我能够通过将自定义 class 映射器注入转换器来解决它。

您还可以让发送系统设置类型 header。

然后,邮件被拒绝,因为我们收到 MessageConversionException.

package com.example;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39264965Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend("my.queue", new Foo());
        context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS);

        // bad json
        template.setMessageConverter(new SimpleMessageConverter());
        template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> {
            m.getMessageProperties().setContentType("application/json");
            return m;
        });


        Thread.sleep(60000);
        context.close();
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("my.queue");
        container.setMessageListener(listenerAdapter);
        container.setMessageConverter(messageConverter);
        return container;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue queue() {
        return new Queue("my.queue");
    }

    @Bean
    public MessageConverter messageConverter() {
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        jackson2JsonMessageConverter.setClassMapper(new ClassMapper() {

            @Override
            public Class<?> toClass(MessageProperties properties) {
                return Foo.class;
            }

            @Override
            public void fromClass(Class<?> clazz, MessageProperties properties) {

            }

        });
        return jackson2JsonMessageConverter;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Worker worker) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
        messageListenerAdapter.setMessageConverter(messageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public Worker worker() {
        return new Worker();
    }

    public static class Worker {

        private final CountDownLatch latch = new CountDownLatch(1);

        public void processMessage(Foo foo) {
            System.out.println(foo);
            this.latch.countDown();
        }

    }

    public static class Foo {

        private String bar = "bar";

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}