Reactor 调度程序在主线程完成后保持 运行 很长时间?如何处理?
Reactor Schedulers keep running long after main thread is done?How to deal with this?
我对如何在使用 Reactor 3 时清理调度程序工作线程有疑问
Flux.range(1, 10000)
.publishOn(Schedulers.newElastic("Y"))
.doOnComplete(() -> {
// WHAT should one do to ensure the worker threads are cleaned up
logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));
当我执行上面的代码时,我看到的是一旦主线程完成 运行 工作线程 is/are 仍然处于 WAITING 状态一段时间。
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
有没有办法控制它?即它们可以被处理掉 onComplete()
吗?我试过 Schedulers.shutdownNow()
但没有用。
另一方面,当我这样做时,我能够控制调度程序的处理。
preferred/advocated 方法是什么?
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.concatWith(Flux.empty())
.publishOn(s)
.doOnComplete(() -> {
s.dispose();
logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));
您需要在助焊剂上调用 blockLast() 方法以确保它已完成。
如果你是运行你在并行或在主线程之外的另一个线程中流动,这一点尤其重要。
注意:需要在链上调用publishOn,参见the reference guide了解publishon的位置为何重要
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.concatWith(Flux.empty())
.publishOn(s)
.doOnComplete(() -> {
logger.info("Shut down all Scheduler worker threads");
}).blocklast();
s.dispose();
如果您使用 Schedulers.new[Elastic|...]
,那么如果您想关闭它,您有责任跟踪结果 Scheduler
。 Schedulers.shutdownNow()
只会在您不明确时关闭库使用的默认调度程序,例如 Schedulers.elastic()
(注意没有 new
前缀)。
在所有操作有 运行 之后清理的最佳方法是使用 doFinally
。这将异步执行清理回调 after onError
|onComplete
|cancel
事件。最好确保它是链中的最后一个运算符,尽管它在所有情况下都试图真正最后执行。
唯一需要注意的是,它 运行 与之前的运算符在同一线程中,换句话说,您正试图关闭的线程... doFinally
回调中的 s.dispose()
将在 其任务队列已处理后 关闭执行程序,因此在这种情况下,线程之前会有轻微的延迟消失。
这是一个示例,它转储线程信息,切换到自定义弹性线程并在 doFinally
中将其关闭(添加过滤器和实体化以提供更短的日志,以便更好地了解事件是如何发生的):
@Test
public void schedulerFinallyShutdown() throws InterruptedException {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
Logger logger = Loggers.getLogger("foo");
CountDownLatch latch = new CountDownLatch(1);
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.publishOn(s)
.concatWith(Flux.<Integer>empty().doOnComplete(() -> {
for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
System.out.println("last element\t" + ti.getThreadName() + " " + ti.getThreadState());
}
}))
.doFinally(sig -> {
s.dispose();
logger.info("Shut down all Scheduler worker threads");
latch.countDown();
})
.filter(x -> x % 1000 == 0)
.materialize()
.subscribe(x -> logger.info(x+ "**"));
latch.await();
for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
System.out.println("after cleanup\t" + ti.getThreadName() + " " + ti.getThreadState());
}
}
打印出来:
11:24:36.608 [X-2] INFO foo - onNext(1000)**
11:24:36.611 [X-2] INFO foo - onNext(2000)**
11:24:36.611 [X-2] INFO foo - onNext(3000)**
11:24:36.612 [X-2] INFO foo - onNext(4000)**
11:24:36.612 [X-2] INFO foo - onNext(5000)**
11:24:36.612 [X-2] INFO foo - onNext(6000)**
11:24:36.612 [X-2] INFO foo - onNext(7000)**
11:24:36.613 [X-2] INFO foo - onNext(8000)**
11:24:36.613 [X-2] INFO foo - onNext(9000)**
11:24:36.613 [X-2] INFO foo - onNext(10000)**
last element X-2 RUNNABLE
last element elastic-evictor-1 TIMED_WAITING
last element Monitor Ctrl-Break RUNNABLE
last element Signal Dispatcher RUNNABLE
last element Finalizer WAITING
last element Reference Handler WAITING
last element main WAITING
11:24:36.626 [X-2] INFO foo - onComplete()**
11:24:36.627 [X-2] INFO foo - Shut down all Scheduler worker threads
after cleanup Monitor Ctrl-Break RUNNABLE
after cleanup Signal Dispatcher RUNNABLE
after cleanup Finalizer WAITING
after cleanup Reference Handler WAITING
after cleanup main RUNNABLE
我对如何在使用 Reactor 3 时清理调度程序工作线程有疑问
Flux.range(1, 10000)
.publishOn(Schedulers.newElastic("Y"))
.doOnComplete(() -> {
// WHAT should one do to ensure the worker threads are cleaned up
logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));
当我执行上面的代码时,我看到的是一旦主线程完成 运行 工作线程 is/are 仍然处于 WAITING 状态一段时间。
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
有没有办法控制它?即它们可以被处理掉 onComplete()
吗?我试过 Schedulers.shutdownNow()
但没有用。
另一方面,当我这样做时,我能够控制调度程序的处理。 preferred/advocated 方法是什么?
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.concatWith(Flux.empty())
.publishOn(s)
.doOnComplete(() -> {
s.dispose();
logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));
您需要在助焊剂上调用 blockLast() 方法以确保它已完成。 如果你是运行你在并行或在主线程之外的另一个线程中流动,这一点尤其重要。
注意:需要在链上调用publishOn,参见the reference guide了解publishon的位置为何重要
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.concatWith(Flux.empty())
.publishOn(s)
.doOnComplete(() -> {
logger.info("Shut down all Scheduler worker threads");
}).blocklast();
s.dispose();
如果您使用 Schedulers.new[Elastic|...]
,那么如果您想关闭它,您有责任跟踪结果 Scheduler
。 Schedulers.shutdownNow()
只会在您不明确时关闭库使用的默认调度程序,例如 Schedulers.elastic()
(注意没有 new
前缀)。
在所有操作有 运行 之后清理的最佳方法是使用 doFinally
。这将异步执行清理回调 after onError
|onComplete
|cancel
事件。最好确保它是链中的最后一个运算符,尽管它在所有情况下都试图真正最后执行。
唯一需要注意的是,它 运行 与之前的运算符在同一线程中,换句话说,您正试图关闭的线程... doFinally
回调中的 s.dispose()
将在 其任务队列已处理后 关闭执行程序,因此在这种情况下,线程之前会有轻微的延迟消失。
这是一个示例,它转储线程信息,切换到自定义弹性线程并在 doFinally
中将其关闭(添加过滤器和实体化以提供更短的日志,以便更好地了解事件是如何发生的):
@Test
public void schedulerFinallyShutdown() throws InterruptedException {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
Logger logger = Loggers.getLogger("foo");
CountDownLatch latch = new CountDownLatch(1);
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.publishOn(s)
.concatWith(Flux.<Integer>empty().doOnComplete(() -> {
for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
System.out.println("last element\t" + ti.getThreadName() + " " + ti.getThreadState());
}
}))
.doFinally(sig -> {
s.dispose();
logger.info("Shut down all Scheduler worker threads");
latch.countDown();
})
.filter(x -> x % 1000 == 0)
.materialize()
.subscribe(x -> logger.info(x+ "**"));
latch.await();
for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
System.out.println("after cleanup\t" + ti.getThreadName() + " " + ti.getThreadState());
}
}
打印出来:
11:24:36.608 [X-2] INFO foo - onNext(1000)**
11:24:36.611 [X-2] INFO foo - onNext(2000)**
11:24:36.611 [X-2] INFO foo - onNext(3000)**
11:24:36.612 [X-2] INFO foo - onNext(4000)**
11:24:36.612 [X-2] INFO foo - onNext(5000)**
11:24:36.612 [X-2] INFO foo - onNext(6000)**
11:24:36.612 [X-2] INFO foo - onNext(7000)**
11:24:36.613 [X-2] INFO foo - onNext(8000)**
11:24:36.613 [X-2] INFO foo - onNext(9000)**
11:24:36.613 [X-2] INFO foo - onNext(10000)**
last element X-2 RUNNABLE
last element elastic-evictor-1 TIMED_WAITING
last element Monitor Ctrl-Break RUNNABLE
last element Signal Dispatcher RUNNABLE
last element Finalizer WAITING
last element Reference Handler WAITING
last element main WAITING
11:24:36.626 [X-2] INFO foo - onComplete()**
11:24:36.627 [X-2] INFO foo - Shut down all Scheduler worker threads
after cleanup Monitor Ctrl-Break RUNNABLE
after cleanup Signal Dispatcher RUNNABLE
after cleanup Finalizer WAITING
after cleanup Reference Handler WAITING
after cleanup main RUNNABLE