无法关闭使用 Runtime.exit、context.close、System.exit() 将消息发布到 kafka spring 引导应用程序的 jms 侦听器
Not able to shutdown the jms listener which posts message to kafka spring boot application with Runtime.exit, context.close, System.exit()
我正在开发一个 spring 引导应用程序,它将使用
监听 ibm mq
@JmsListener(id="abc", destination="${queueName}", containerFactory="defaultJmsListenerContainerFactory")
我有一个 JmsListenerEndpointRegistry
启动 listenerContainer
。
On message 将尝试将具有某些业务逻辑的相同消息推送到kafka。海报代码是
kafkaTemplate.send(kafkaProp.getTopic(), uniqueId, message)
现在,如果 kafka 生产者失败,我希望我的启动应用程序被终止。所以我添加了一个自定义
setErrorHandler.
所以我试过了
`System.exit(1)`, `configurableApplicationContextObject.close()`, `Runtime.getRuntime.exit(1)`.
但其中 none 有效。以下是之后生成的日志
System.exit(0)
或以上。
2018-05-24 12:12:47.981 INFO 18904 --- [ Thread-4] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d08376: startup date [Thu May 24 12:10:35 IST 2018]; root of context hierarchy
2018-05-24 12:12:48.027 INFO 18904 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.a.k.clients.producer.KafkaProducer : Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-24 12:12:48.044 INFO 18904 --- [ Thread-4] o.a.k.clients.producer.KafkaProducer : Closing the Kafka producer with timeoutMillis = 30000 ms.
但应用程序仍然是 运行,下面是 运行 个线程
Daemon Thread [Tomcat JDBC Pool Cleaner[14341596:1527144039908]] (Running)
Thread [DefaultMessageListenerContainer-1] (Running)
Thread [DestroyJavaVM] (Running)
Daemon Thread [JMSCCThreadPoolMaster] (Running)
Daemon Thread [RcvThread: com.ibm.mq.jmqi.remote.impl.RemoteTCPConnection@12474910[qmid=*******,fap=**,channel=****,ccsid=***,sharecnv=***,hbint=*****,peer=*******,localport=****,ssl=****]] (Running)
Thread [Thread-4] (Running)
非常感谢您的帮助。提前致谢。我只是希望应用程序应该退出。
下面是我调用之前的线程转储 System.exit(1)
"DefaultMessageListenerContainer-1"
java.lang.Thread.State: RUNNABLE
at sun.management.ThreadImpl.getThreadInfo1(Native Method)
at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:174)
at com.QueueErrorHandler.handleError(QueueErrorHandler.java:42)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:931)
at org.springframework.jms.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:902)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:235)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.lang.Thread.run(Thread.java:745)
您应该进行线程转储以查看 Thread [DefaultMessageListenerContainer-1] (Running)
正在做什么。
Now in case a kafka producer fails
什么样的失败?如果 broker 宕机,线程默认会在生产者库中阻塞最多 60 秒。
您可以通过设置 max.block.ms
生产者 属性 来减少该时间。
上面的几个解决方案对我有用。
解决方案1。
获取错误处理程序中的所有线程并将它们全部中断,然后退出系统。
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (ThreadInfo threadInfo : threadInfos) {
Thread.currentThread().interrupt();
}
System.exit(1);
解决方案 2. 定义应用程序上下文管理器。喜欢
public class AppContextManager implements ApplicationContextAware {
private static ApplicationContext _appCtx;
@Override
public void setApplicationContext(ApplicationContext ctx){
_appCtx = ctx;
}
public static ApplicationContext getAppContext(){
return _appCtx;
}
public static void exit(Integer exitCode) {
System.exit(SpringApplication.exit(_appCtx,() -> exitCode));
}
}
然后使用相同的管理器在错误处理程序中退出
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
jmsListenerEndpointRegistry.stop();
AppContextManager.exit(-1);
}
});
我正在开发一个 spring 引导应用程序,它将使用
监听 ibm mq@JmsListener(id="abc", destination="${queueName}", containerFactory="defaultJmsListenerContainerFactory")
我有一个 JmsListenerEndpointRegistry
启动 listenerContainer
。
On message 将尝试将具有某些业务逻辑的相同消息推送到kafka。海报代码是
kafkaTemplate.send(kafkaProp.getTopic(), uniqueId, message)
现在,如果 kafka 生产者失败,我希望我的启动应用程序被终止。所以我添加了一个自定义
setErrorHandler.
所以我试过了
`System.exit(1)`, `configurableApplicationContextObject.close()`, `Runtime.getRuntime.exit(1)`.
但其中 none 有效。以下是之后生成的日志
System.exit(0)
或以上。
2018-05-24 12:12:47.981 INFO 18904 --- [ Thread-4] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d08376: startup date [Thu May 24 12:10:35 IST 2018]; root of context hierarchy
2018-05-24 12:12:48.027 INFO 18904 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2018-05-24 12:12:48.028 INFO 18904 --- [ Thread-4] o.a.k.clients.producer.KafkaProducer : Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-24 12:12:48.044 INFO 18904 --- [ Thread-4] o.a.k.clients.producer.KafkaProducer : Closing the Kafka producer with timeoutMillis = 30000 ms.
但应用程序仍然是 运行,下面是 运行 个线程
Daemon Thread [Tomcat JDBC Pool Cleaner[14341596:1527144039908]] (Running)
Thread [DefaultMessageListenerContainer-1] (Running)
Thread [DestroyJavaVM] (Running)
Daemon Thread [JMSCCThreadPoolMaster] (Running)
Daemon Thread [RcvThread: com.ibm.mq.jmqi.remote.impl.RemoteTCPConnection@12474910[qmid=*******,fap=**,channel=****,ccsid=***,sharecnv=***,hbint=*****,peer=*******,localport=****,ssl=****]] (Running)
Thread [Thread-4] (Running)
非常感谢您的帮助。提前致谢。我只是希望应用程序应该退出。
下面是我调用之前的线程转储 System.exit(1)
"DefaultMessageListenerContainer-1"
java.lang.Thread.State: RUNNABLE
at sun.management.ThreadImpl.getThreadInfo1(Native Method)
at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:174)
at com.QueueErrorHandler.handleError(QueueErrorHandler.java:42)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:931)
at org.springframework.jms.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:902)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:235)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.lang.Thread.run(Thread.java:745)
您应该进行线程转储以查看 Thread [DefaultMessageListenerContainer-1] (Running)
正在做什么。
Now in case a kafka producer fails
什么样的失败?如果 broker 宕机,线程默认会在生产者库中阻塞最多 60 秒。
您可以通过设置 max.block.ms
生产者 属性 来减少该时间。
上面的几个解决方案对我有用。
解决方案1。 获取错误处理程序中的所有线程并将它们全部中断,然后退出系统。
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (ThreadInfo threadInfo : threadInfos) {
Thread.currentThread().interrupt();
}
System.exit(1);
解决方案 2. 定义应用程序上下文管理器。喜欢
public class AppContextManager implements ApplicationContextAware {
private static ApplicationContext _appCtx;
@Override
public void setApplicationContext(ApplicationContext ctx){
_appCtx = ctx;
}
public static ApplicationContext getAppContext(){
return _appCtx;
}
public static void exit(Integer exitCode) {
System.exit(SpringApplication.exit(_appCtx,() -> exitCode));
}
}
然后使用相同的管理器在错误处理程序中退出
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
jmsListenerEndpointRegistry.stop();
AppContextManager.exit(-1);
}
});