在服务中停放线程

Parking threads in service

我正在试验线程停放并决定构建某种服务。这是它的样子:

public class TestService {
    private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

    private final CountDownLatch stopLatch;
    private final Object parkBlocker = new Object();
    private volatile boolean stopped;
    private final Thread[] workers;

    public TestService(int parallelizm) {
        stopLatch = new CountDownLatch(parallelizm);
        workers = new Thread[parallelizm];
        for (int i = 0; i < parallelizm; i++) {
            workers[i] = new Thread(() -> {
                try {
                    while (!stopped) {
                        logger.debug("Parking " + Thread.currentThread().getName());
                        LockSupport.park(parkBlocker);
                        logger.debug(Thread.currentThread().getName() + " unparked");
                    }
                } finally {
                    stopLatch.countDown();
                }
            });
        }
    }

    public void start() {
        Arrays.stream(workers).forEach(t -> {
            t.start();
            logger.debug(t.getName() + " started");
        });
    }

    public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
        boolean stoppedSuccefully = false;
        this.stopped = true;
        unparkWorkers();
        if (stopLatch.await(timeout, unit)) {
            stoppedSuccefully = true;
        }
        return stoppedSuccefully;
    }

    private void unparkWorkers() {
        Arrays.stream(workers).forEach(w -> {
            LockSupport.unpark(w);
            logger.debug("Un-park call is done on " + w.getName());
        });
    }
}

我遇到的问题是,如果我按如下方式测试此服务:

public static void main(String[] args) = {
  while(true) {
    TestService service = new TestService(2);
    service.start();
    if (!service.stop(10000, TimeUnit.MILLISECONDS))
      throw new RuntimeException();
  }
}

我有时会遇到以下行为:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
    at com.pack.age.Test$.main(Test.scala:12)
    at com.pack.age.Test.main(Test.scala)

主题正在暂停:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000720739a68> (a java.lang.Object)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.pack.age.TestService.lambda$new[=13=](TestService.java:27)
    at com.pack.age.TestService$$Lambda/1327763628.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

我在服务中没有看到任何 park-unpark 比赛。此外,如果 unparkpark 之前被调用,park 保证不会阻塞(这就是 javadocs 所说的)。

可能我误用了LockSupport::park。你能提出任何修复建议吗?

这与记录器无关,尽管它的使用使问题浮出水面。你有一个竞争条件,就这么简单。在解释竞争条件之前,您需要先了解 LockSupport::unpark 文档中的一些内容:

Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.

第一点解释here。简短的版本是:如果您有一个 thread 已经启动,但 尚未 调用 park,并且在这段时间内(在startpark),其他一些线程在第一个线程上调用 unpark:该线程根本不会停放。许可证将立即可用。这张小图可能会更清楚:

(ThreadA)  start ------------------ park --------- ....

(ThreadB)  start ----- unpark -----

注意 ThreadB 如何在 ThreadA 调用 startpark 之间调用 unpark(ThreadA)。因此,当 ThreadA 达到 park 时: 保证不会阻塞 ,就像文档中说的那样。

同一文档的第二点是:

This operation is not guaranteed to have any effect at all if the given thread has not been started.

让我们看一张图:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

ThreadA 调用 park 之后,它将永远挂起,因为 ThreadB 再也不会调用 unpark 了。请注意,对 unpark 的调用是在 ThreadA 开始之前进行的(与前面的示例不同)。

这正是您的情况:

LockSupport.unpark(w);(来自 unparkWorkers)在 之前被称为 t.start(); 来自 public void start(){...}。简而言之-您的代码在 workers 之前都调用了 unpark,甚至在它们开始 之前,因此当它们最终到达 park 时 - 它们被卡住了,没有人能够 unpark 他们。你用 logger 而不是 System::out 看到这个的事实很可能与你 println 时的脸有关 - 在引擎盖下有一个 synchronized 方法.


事实上,LockSupport 提供了证明这一点所需的语义。为此,我们需要(为简单起见:SOProblem service = new SOProblem(1);

static class ParkBlocker {

    private volatile int x;

    public ParkBlocker(int x) {
        this.x = x;
    }

    public int getX() {
        return x;
    }
}

现在我们需要将它插入到正确的方法中。首先标记我们调用的事实 unpark:

private void unparkWorkers() {
    Arrays.stream(workers).forEach(w -> {
        LockSupport.unpark(w);
        logger.debug("Un-park call is done on " + w.getName());
    });
    /*
     * add "1" to whatever there is already in pb.x, meaning
     * we have done unparking _also_
     */
    int y = pb.x;
    y = y + 1;
    pb.x = y;
}

然后在循环结束后重置标志:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
    boolean stoppedSuccefully = false;
    stopped = true;
    unparkWorkers();
    if (stopLatch.await(timeout, unit)) {
        stoppedSuccefully = true;
        // reset the flag
        pb.x = 0;
    }
    return stoppedSuccefully;
}

然后更改构造函数以标记线程已启动:

  .....
  while (!stopped) {
       logger.debug("Parking " + Thread.currentThread().getName());
       // flag the fact that thread has started. add "2", meaning
       // thread has started
       int y = pb.x;
       y = y + 2;
       pb.x = y;
       LockSupport.park(pb);
       logger.debug(Thread.currentThread().getName() + " unparked");
  }

然后,当您的线程冻结时,您需要检查标志:

 public static void main(String[] args) throws InterruptedException {
    while (true) {
        SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
        service.start();
        if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
            service.debug();
            throw new RuntimeException();
        }
    }
}

其中 debug 方法是:

public void debug() {
    Arrays.stream(workers)
          .forEach(x -> {
              ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
              if (pb != null) {
                  System.out.println("x = " + pb.getX());
              }
          });
}

当问题重现时,您在调用 park 之前调用了 unpark ,这发生在 x = 3 作为输出时.