不可能(?)NullPointerException - Springframework RabbitMQ,无法调用 afterAckCallback

Impossible (?) NullPointerException - Springframework RabbitMQ, Failed to invoke afterAckCallback

我运行正在使用 RabbitMQ Server 3.8.9、spring-amqp-2.2.10.RELEASE 和 spring 的 Java 应用程序]-rabbit-2.2.10.RELEASE.

我的测试用例执行如下操作:

  1. 启动 RabbitMQ 服务器
  2. 启动我的 Java 应用程序
  3. 测试并验证我的 Java 应用程序的某些功能
  4. 优雅地停止我的 Java 应用程序
  5. 优雅地停止 RabbitMQ 服务器
  6. 再重复1-6几次

一切看起来都很好,除了有时在大约 10 分钟后的一次重新启动期间,我在我的应用程序日志中看到以下错误:

2021-02-05 12:52:46.498 UTC,ERROR,org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl,null,rabbitConnectionFactory23,runWorker():1149,Failed to invoke afterAckCallback
java.lang.NullPointerException: null
    at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$doHandleConfirm(PublisherCallbackChannelImpl.java:1027) ~[spring-rabbit.jar:2.2.10.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]

进一步的分析没有指向任何具体的东西。 RabbitMQ 日志文件中没有错误,RabbitMQ 服务器没有重启,在上述时间戳期间 RabbitMQ 日志中没有任何异常。

有问题的代码:

https://github.com/spring-projects/spring-amqp/blob/v2.2.10.RELEASE/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java#L1027

我的测试是自动化的,并且 运行 作为 CI 管道的一部分。该问题是间歇性的,我无法在我的沙箱中本地重现它。

据我所知,我的 Java 应用程序的功能未受影响。

创建到处使用的 RabbitMQ 连接工厂的代码:

final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOST_NAME);
connectionFactory.setChannelCacheSize(1);
connectionFactory.setPublisherConfirms(true);

这似乎是一个并发问题,但我不太确定如何找到它的根源。在大多数情况下,我们使用 RabbitTemplate 和其他 Spring 工具连接到 RabbitMQ。

Spring 世界上任何对 RabbitMQ 有一定了解的人愿意插话吗?

谢谢

你说的代码是这样的:

finally {
    try {
        if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
                this.afterAckCallback.accept(this);
                this.afterAckCallback = null;
            }
        }
        catch (Exception e) {
            this.logger.error("Failed to invoke afterAckCallback", e);
        }
}

this.afterAckCallback 属性 确实可能存在竞争条件。 我们可以在一个中传递 if(),但随后不同的线程将 this.afterAckCallback 作为 null,因此我们因该 NPE 而失败。 我们必须将它的值复制到局部变量中,然后检查并执行 accept().

欢迎针对 Spring AMQP 项目提出 GitHub 问题:https://github.com/spring-projects/spring-amqp/issues

我们有一个竞争条件,因为我们真的用它在 processMultipleAck().

循环中的异步逻辑来调用这个 doHandleConfirm()