Spring rabbitlistner 使用注解语法停止监听队列

Spring rabbitlistner stop listening to queue using annotation syntax

我和一位同事正在开发一个使用 Spring 的应用程序,它需要从 RabbitMQ 队列获取消息。这个想法是使用(通常是优秀的)spring 注释系统来做到这一点,以使代码易于理解。我们让系统使用@RabbitListner 注释工作,但我们希望按需获取消息。 @RabbitListner 注释不会这样做,它只是在消息可用时接收消息。需求由客户端的 "readiness" 决定,即客户端应该 "get" 来自 te 队列的消息停止列表并处理该消息。然后确定它是否准备好接收一个新的并重新连接到队列。

我们一直在研究仅使用 spring-amqp/spring-rabbit 模块手动完成此操作,虽然这可能是可能的,但我们真的很想使用 spring 来完成此操作。经过数小时的搜索和查阅文档,我们未能找到答案。

这是我们目前的接收码:

@RabbitListener(queues = "jobRequests")
public class Receiver {

@Autowired
private JobProcessor jobProcessor;

@RabbitHandler
public void receive(Job job) throws InterruptedException, IOException {
    System.out.println(" [x] Received '" + job + "'");
    jobProcessor.processJob(job);
}

}

作业处理器:

@Service
public class JobProcessor {

@Autowired
private RabbitTemplate rabbitTemplate;

public boolean processJob(Job job) throws InterruptedException, IOException {
    rabbitTemplate.convertAndSend("jobResponses", job);

    System.out.println(" [x] Processing job: " + job);

    rabbitTemplate.convertAndSend("processedJobs", job);

    return true;
}

}

换句话说,当接收器接收到作业时,它应该停止侦听新作业并等待作业处理器完成,然后开始列出新消息。


我们重新创建了空指针异常,这里是我们用来从服务器端发送的代码。

@Controller
public class MainController {

@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
private Queue jobRequests;

@RequestMapping("/do-job")
public String doJob() {

    Job job = new Job(new Application(), "henk", 42);

    System.out.println(" [X] Job sent: " + job);

    rabbitTemplate.convertAndSend(jobRequests.getName(), job);

    return "index";
    }
}

然后是客户端的接收码

@Component
public class Receiver {

@Autowired
private JobProcessor jobProcessor;

@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RabbitListener(queues = "jobRequests")
public void receive(Job job) throws InterruptedException, IOException, TimeoutException {

    Collection<MessageListenerContainer> messageListenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();

    for (MessageListenerContainer listenerContainer :messageListenerContainers) {
        System.out.println(listenerContainer);
        listenerContainer.stop();
    }

    System.out.println(" [x] Received '" + job + "'");
    jobProcessor.processJob(job);

    for (MessageListenerContainer listenerContainer :messageListenerContainers) {
        listenerContainer.start();
    }
   }
}

以及更新后的作业处理器

@Service
public class JobProcessor {

public boolean processJob(Job job) throws InterruptedException, IOException {

    System.out.println(" [x] Processing job: " + job);

    return true;
}

}

和堆栈跟踪

[x] Received 'Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}'
[x] Processing job: Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}
Exception in thread "SimpleAsyncTaskExecutor-1" java.lang.NullPointerException
2015-12-18 11:17:44.494 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:93)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1301)
    at java.lang.Thread.run(Thread.java:745)
 WARN 325899 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

java.lang.NullPointerException: null
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]

侦听器的停止有效,我们确实收到了一份新作业,但是当它尝试再次启动它时,将抛出 NPE。我们检查了 rabbitMQ 日志,发现连接关闭了大约 2 秒然后自动重新打开,即使我们在作业处理器中将线程置于睡眠状态。这可能是问题的根源?然而,这个错误并没有中断程序,在它被抛出之后,接收者仍然能够接收新的工作。我们是在滥用这里的机制还是这个有效代码?

要按需获取消息,通常使用 rabbitTemplate.receiveAndConvert() 比使用监听器更好;这样您就可以完全控制何时收到消息。

version 1.5 开始,您可以将模板配置为阻塞一段时间(或直到消息到达)。否则,如果没有消息,它会立即 returns null

侦听器实际上是为消息驱动的应用程序设计的。

如果您可以在作业完成之前阻塞侦听器中的线程,则不会传递更多消息 - 默认情况下容器只有一个线程。

如果您不能在作业完成之前阻塞线程,出于某种原因,您可以通过从 Endpoint Registry.[=15= 获取对它的引用来 stop()/start() 侦听器容器]

通常最好在单独的线程上停止容器。