在服务中停放线程
Parking threads in service
我正在试验线程停放并决定构建某种服务。这是它的样子:
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles
private final CountDownLatch stopLatch;
private final Object parkBlocker = new Object();
private volatile boolean stopped;
private final Thread[] workers;
public TestService(int parallelizm) {
stopLatch = new CountDownLatch(parallelizm);
workers = new Thread[parallelizm];
for (int i = 0; i < parallelizm; i++) {
workers[i] = new Thread(() -> {
try {
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
LockSupport.park(parkBlocker);
logger.debug(Thread.currentThread().getName() + " unparked");
}
} finally {
stopLatch.countDown();
}
});
}
}
public void start() {
Arrays.stream(workers).forEach(t -> {
t.start();
logger.debug(t.getName() + " started");
});
}
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
this.stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
}
return stoppedSuccefully;
}
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
}
}
我遇到的问题是,如果我按如下方式测试此服务:
public static void main(String[] args) = {
while(true) {
TestService service = new TestService(2);
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS))
throw new RuntimeException();
}
}
我有时会遇到以下行为:
14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
at com.pack.age.Test$.main(Test.scala:12)
at com.pack.age.Test.main(Test.scala)
主题正在暂停:
"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000720739a68> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.pack.age.TestService.lambda$new[=13=](TestService.java:27)
at com.pack.age.TestService$$Lambda/1327763628.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
我在服务中没有看到任何 park-unpark 比赛。此外,如果 unpark
在 park
之前被调用,park
保证不会阻塞(这就是 javadocs 所说的)。
可能我误用了LockSupport::park
。你能提出任何修复建议吗?
这与记录器无关,尽管它的使用使问题浮出水面。你有一个竞争条件,就这么简单。在解释竞争条件之前,您需要先了解 LockSupport::unpark
文档中的一些内容:
Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.
第一点解释here。简短的版本是:如果您有一个 thread
已经启动,但 尚未 调用 park
,并且在这段时间内(在start
和 park
),其他一些线程在第一个线程上调用 unpark
:该线程根本不会停放。许可证将立即可用。这张小图可能会更清楚:
(ThreadA) start ------------------ park --------- ....
(ThreadB) start ----- unpark -----
注意 ThreadB
如何在 ThreadA
调用 start
和 park
之间调用 unpark(ThreadA)
。因此,当 ThreadA
达到 park
时: 保证不会阻塞 ,就像文档中说的那样。
同一文档的第二点是:
This operation is not guaranteed to have any effect at all if the given thread has not been started.
让我们看一张图:
Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park
在 ThreadA
调用 park
之后,它将永远挂起,因为 ThreadB
再也不会调用 unpark
了。请注意,对 unpark
的调用是在 在 ThreadA
开始之前进行的(与前面的示例不同)。
这正是您的情况:
LockSupport.unpark(w);
(来自 unparkWorkers
)在 之前被称为 t.start();
来自 public void start(){...}
。简而言之-您的代码在 workers
之前都调用了 unpark
,甚至在它们开始 之前,因此当它们最终到达 park
时 - 它们被卡住了,没有人能够 unpark
他们。你用 logger
而不是 System::out
看到这个的事实很可能与你 println
时的脸有关 - 在引擎盖下有一个 synchronized
方法.
事实上,LockSupport
提供了证明这一点所需的语义。为此,我们需要(为简单起见:SOProblem service = new SOProblem(1);
)
static class ParkBlocker {
private volatile int x;
public ParkBlocker(int x) {
this.x = x;
}
public int getX() {
return x;
}
}
现在我们需要将它插入到正确的方法中。首先标记我们调用的事实 unpark
:
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
/*
* add "1" to whatever there is already in pb.x, meaning
* we have done unparking _also_
*/
int y = pb.x;
y = y + 1;
pb.x = y;
}
然后在循环结束后重置标志:
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
// reset the flag
pb.x = 0;
}
return stoppedSuccefully;
}
然后更改构造函数以标记线程已启动:
.....
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
// flag the fact that thread has started. add "2", meaning
// thread has started
int y = pb.x;
y = y + 2;
pb.x = y;
LockSupport.park(pb);
logger.debug(Thread.currentThread().getName() + " unparked");
}
然后,当您的线程冻结时,您需要检查标志:
public static void main(String[] args) throws InterruptedException {
while (true) {
SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
service.debug();
throw new RuntimeException();
}
}
}
其中 debug
方法是:
public void debug() {
Arrays.stream(workers)
.forEach(x -> {
ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
if (pb != null) {
System.out.println("x = " + pb.getX());
}
});
}
当问题重现时,您在调用 park
之前调用了 unpark
,这发生在 x = 3
作为输出时.
我正在试验线程停放并决定构建某种服务。这是它的样子:
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles
private final CountDownLatch stopLatch;
private final Object parkBlocker = new Object();
private volatile boolean stopped;
private final Thread[] workers;
public TestService(int parallelizm) {
stopLatch = new CountDownLatch(parallelizm);
workers = new Thread[parallelizm];
for (int i = 0; i < parallelizm; i++) {
workers[i] = new Thread(() -> {
try {
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
LockSupport.park(parkBlocker);
logger.debug(Thread.currentThread().getName() + " unparked");
}
} finally {
stopLatch.countDown();
}
});
}
}
public void start() {
Arrays.stream(workers).forEach(t -> {
t.start();
logger.debug(t.getName() + " started");
});
}
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
this.stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
}
return stoppedSuccefully;
}
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
}
}
我遇到的问题是,如果我按如下方式测试此服务:
public static void main(String[] args) = {
while(true) {
TestService service = new TestService(2);
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS))
throw new RuntimeException();
}
}
我有时会遇到以下行为:
14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
at com.pack.age.Test$.main(Test.scala:12)
at com.pack.age.Test.main(Test.scala)
主题正在暂停:
"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000720739a68> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.pack.age.TestService.lambda$new[=13=](TestService.java:27)
at com.pack.age.TestService$$Lambda/1327763628.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
我在服务中没有看到任何 park-unpark 比赛。此外,如果 unpark
在 park
之前被调用,park
保证不会阻塞(这就是 javadocs 所说的)。
可能我误用了LockSupport::park
。你能提出任何修复建议吗?
这与记录器无关,尽管它的使用使问题浮出水面。你有一个竞争条件,就这么简单。在解释竞争条件之前,您需要先了解 LockSupport::unpark
文档中的一些内容:
Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.
第一点解释here。简短的版本是:如果您有一个 thread
已经启动,但 尚未 调用 park
,并且在这段时间内(在start
和 park
),其他一些线程在第一个线程上调用 unpark
:该线程根本不会停放。许可证将立即可用。这张小图可能会更清楚:
(ThreadA) start ------------------ park --------- ....
(ThreadB) start ----- unpark -----
注意 ThreadB
如何在 ThreadA
调用 start
和 park
之间调用 unpark(ThreadA)
。因此,当 ThreadA
达到 park
时: 保证不会阻塞 ,就像文档中说的那样。
同一文档的第二点是:
This operation is not guaranteed to have any effect at all if the given thread has not been started.
让我们看一张图:
Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park
在 ThreadA
调用 park
之后,它将永远挂起,因为 ThreadB
再也不会调用 unpark
了。请注意,对 unpark
的调用是在 在 ThreadA
开始之前进行的(与前面的示例不同)。
这正是您的情况:
LockSupport.unpark(w);
(来自 unparkWorkers
)在 之前被称为 t.start();
来自 public void start(){...}
。简而言之-您的代码在 workers
之前都调用了 unpark
,甚至在它们开始 之前,因此当它们最终到达 park
时 - 它们被卡住了,没有人能够 unpark
他们。你用 logger
而不是 System::out
看到这个的事实很可能与你 println
时的脸有关 - 在引擎盖下有一个 synchronized
方法.
事实上,LockSupport
提供了证明这一点所需的语义。为此,我们需要(为简单起见:SOProblem service = new SOProblem(1);
)
static class ParkBlocker {
private volatile int x;
public ParkBlocker(int x) {
this.x = x;
}
public int getX() {
return x;
}
}
现在我们需要将它插入到正确的方法中。首先标记我们调用的事实 unpark
:
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
/*
* add "1" to whatever there is already in pb.x, meaning
* we have done unparking _also_
*/
int y = pb.x;
y = y + 1;
pb.x = y;
}
然后在循环结束后重置标志:
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
// reset the flag
pb.x = 0;
}
return stoppedSuccefully;
}
然后更改构造函数以标记线程已启动:
.....
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
// flag the fact that thread has started. add "2", meaning
// thread has started
int y = pb.x;
y = y + 2;
pb.x = y;
LockSupport.park(pb);
logger.debug(Thread.currentThread().getName() + " unparked");
}
然后,当您的线程冻结时,您需要检查标志:
public static void main(String[] args) throws InterruptedException {
while (true) {
SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
service.debug();
throw new RuntimeException();
}
}
}
其中 debug
方法是:
public void debug() {
Arrays.stream(workers)
.forEach(x -> {
ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
if (pb != null) {
System.out.println("x = " + pb.getX());
}
});
}
当问题重现时,您在调用 park
之前调用了 unpark
,这发生在 x = 3
作为输出时.