不可能(?)NullPointerException - Springframework RabbitMQ,无法调用 afterAckCallback
Impossible (?) NullPointerException - Springframework RabbitMQ, Failed to invoke afterAckCallback
我运行正在使用 RabbitMQ Server 3.8.9、spring-amqp-2.2.10.RELEASE 和 spring 的 Java 应用程序]-rabbit-2.2.10.RELEASE.
我的测试用例执行如下操作:
- 启动 RabbitMQ 服务器
- 启动我的 Java 应用程序
- 测试并验证我的 Java 应用程序的某些功能
- 优雅地停止我的 Java 应用程序
- 优雅地停止 RabbitMQ 服务器
- 再重复1-6几次
一切看起来都很好,除了有时在大约 10 分钟后的一次重新启动期间,我在我的应用程序日志中看到以下错误:
2021-02-05 12:52:46.498 UTC,ERROR,org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl,null,rabbitConnectionFactory23,runWorker():1149,Failed to invoke afterAckCallback
java.lang.NullPointerException: null
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$doHandleConfirm(PublisherCallbackChannelImpl.java:1027) ~[spring-rabbit.jar:2.2.10.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]
进一步的分析没有指向任何具体的东西。 RabbitMQ 日志文件中没有错误,RabbitMQ 服务器没有重启,在上述时间戳期间 RabbitMQ 日志中没有任何异常。
有问题的代码:
我的测试是自动化的,并且 运行 作为 CI 管道的一部分。该问题是间歇性的,我无法在我的沙箱中本地重现它。
据我所知,我的 Java 应用程序的功能未受影响。
创建到处使用的 RabbitMQ 连接工厂的代码:
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOST_NAME);
connectionFactory.setChannelCacheSize(1);
connectionFactory.setPublisherConfirms(true);
这似乎是一个并发问题,但我不太确定如何找到它的根源。在大多数情况下,我们使用 RabbitTemplate 和其他 Spring 工具连接到 RabbitMQ。
Spring 世界上任何对 RabbitMQ 有一定了解的人愿意插话吗?
谢谢
你说的代码是这样的:
finally {
try {
if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
this.afterAckCallback.accept(this);
this.afterAckCallback = null;
}
}
catch (Exception e) {
this.logger.error("Failed to invoke afterAckCallback", e);
}
}
this.afterAckCallback
属性 确实可能存在竞争条件。
我们可以在一个中传递 if()
,但随后不同的线程将 this.afterAckCallback
作为 null
,因此我们因该 NPE 而失败。
我们必须将它的值复制到局部变量中,然后检查并执行 accept()
.
欢迎针对 Spring AMQP 项目提出 GitHub 问题:https://github.com/spring-projects/spring-amqp/issues
我们有一个竞争条件,因为我们真的用它在 processMultipleAck()
.
循环中的异步逻辑来调用这个 doHandleConfirm()
我运行正在使用 RabbitMQ Server 3.8.9、spring-amqp-2.2.10.RELEASE 和 spring 的 Java 应用程序]-rabbit-2.2.10.RELEASE.
我的测试用例执行如下操作:
- 启动 RabbitMQ 服务器
- 启动我的 Java 应用程序
- 测试并验证我的 Java 应用程序的某些功能
- 优雅地停止我的 Java 应用程序
- 优雅地停止 RabbitMQ 服务器
- 再重复1-6几次
一切看起来都很好,除了有时在大约 10 分钟后的一次重新启动期间,我在我的应用程序日志中看到以下错误:
2021-02-05 12:52:46.498 UTC,ERROR,org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl,null,rabbitConnectionFactory23,runWorker():1149,Failed to invoke afterAckCallback
java.lang.NullPointerException: null
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$doHandleConfirm(PublisherCallbackChannelImpl.java:1027) ~[spring-rabbit.jar:2.2.10.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]
进一步的分析没有指向任何具体的东西。 RabbitMQ 日志文件中没有错误,RabbitMQ 服务器没有重启,在上述时间戳期间 RabbitMQ 日志中没有任何异常。
有问题的代码:
我的测试是自动化的,并且 运行 作为 CI 管道的一部分。该问题是间歇性的,我无法在我的沙箱中本地重现它。
据我所知,我的 Java 应用程序的功能未受影响。
创建到处使用的 RabbitMQ 连接工厂的代码:
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOST_NAME);
connectionFactory.setChannelCacheSize(1);
connectionFactory.setPublisherConfirms(true);
这似乎是一个并发问题,但我不太确定如何找到它的根源。在大多数情况下,我们使用 RabbitTemplate 和其他 Spring 工具连接到 RabbitMQ。
Spring 世界上任何对 RabbitMQ 有一定了解的人愿意插话吗?
谢谢
你说的代码是这样的:
finally {
try {
if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
this.afterAckCallback.accept(this);
this.afterAckCallback = null;
}
}
catch (Exception e) {
this.logger.error("Failed to invoke afterAckCallback", e);
}
}
this.afterAckCallback
属性 确实可能存在竞争条件。
我们可以在一个中传递 if()
,但随后不同的线程将 this.afterAckCallback
作为 null
,因此我们因该 NPE 而失败。
我们必须将它的值复制到局部变量中,然后检查并执行 accept()
.
欢迎针对 Spring AMQP 项目提出 GitHub 问题:https://github.com/spring-projects/spring-amqp/issues
我们有一个竞争条件,因为我们真的用它在 processMultipleAck()
.
doHandleConfirm()