倒数关卡
Count down barrier
我正在寻找 Java 并发世界中的倒计时 barrier/sync:我想要一个 class 让线程 acquire()
为 总体 n
次,比屏障阻塞 t
秒(睡眠)。
在 delay/sleep 之后,应进一步处理线程。 A Semaphore does not solve the issue because all threads need to be blocked. I suppose it can be achieved with a ReentrantLock 甚至更好 compare and swap
(CAS)。 CountDownLatch 是不够的,因为我想在达到条件后重置计数。
你能在 Java 7+ 中给我一些提示吗?
我想到了这个实现:
public class CountDownBarrier {
//public static final Logger LOG = LogManager.getLogger(CountDownBarrier.class);
protected int cnt;
protected int currCnt;
protected int sleep;
protected ReentrantLock lock = null;
//protected Condition cond = null;
public CountDownBarrier(int cnt, int sleep) {
this.cnt = cnt;
this.currCnt = cnt;
this.sleep = sleep;
lock = new ReentrantLock();
//cond = lock.newCondition();
}
public void acquire() throws InterruptedException {
lock.lock();
try {
if(currCnt <= 0) {
//LOG.info("Sleep starts ###################################");
Thread.sleep(sleep);
currCnt = cnt;
//LOG.info("Sleep is over ###################################");
}
currCnt--;
} finally {
lock.unlock();
}
}
}
我的测试用例如下所示:
public class CountDownBarrierTest {
public static final Logger LOG = LogManager.getLogger(CountDownBarrierTest.class);
public static int ITER = 5;
@Test
public void test1() throws InterruptedException {
LOG.info("CountDownBarrierTest.test1()");
CountDownBarrier barrier = new CountDownBarrier(4, 5000);
List<Thread> tList = new ArrayList<Thread>();
for(int i = 0; i < 3; i++) {
TheAction x = new TheAction(i, barrier);
tList.add(new Thread(x));
}
LOG.info("Start all threads");
for(Thread t : tList) {
t.start();
}
for(Thread t : tList) {
t.join();
}
LOG.info("All threads finished");
}
private class TheAction
implements Runnable {
private int id;
private CountDownBarrier barrier;
public TheAction(int id, CountDownBarrier barrier) {
this.id = id;
this.barrier = barrier;
}
@Override
public void run() {
try {
for(int i = 0; i < ITER; i++) {
barrier.acquire();
LOG.info("#" + id + ": Action!!");
Thread.sleep(1000);
//LOG.info("#" + id + ": ------------");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出可能如下所示:
15:38:46.926 [main] [INFO] CountDownBarrierTest - CountDownBarrierTest.test1()
15:38:46.933 [main] [INFO] CountDownBarrierTest- Start all threads
15:38:46.934 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:46.934 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:38:46.934 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:47.934 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:47.934 [Thread-3] [INFO] CountDownBarrier - Sleep starts ###################################
15:38:52.935 [Thread-3] [INFO] CountDownBarrier - Sleep is over ###################################
15:38:52.935 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:52.935 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:52.935 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:38:53.935 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:53.935 [Thread-3] [INFO] CountDownBarrier - Sleep starts ###################################
15:38:58.935 [Thread-3] [INFO] CountDownBarrier - Sleep is over ###################################
15:38:58.935 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:58.935 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:38:58.935 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:59.935 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:59.935 [Thread-2] [INFO] CountDownBarrier - Sleep starts ###################################
15:39:04.936 [Thread-2] [INFO] CountDownBarrier - Sleep is over ###################################
15:39:04.936 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:39:04.936 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:39:05.936 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:39:06.936 [main] [INFO] CountDownBarrierTest - All threads finished
我正在寻找 Java 并发世界中的倒计时 barrier/sync:我想要一个 class 让线程 acquire()
为 总体 n
次,比屏障阻塞 t
秒(睡眠)。
在 delay/sleep 之后,应进一步处理线程。 A Semaphore does not solve the issue because all threads need to be blocked. I suppose it can be achieved with a ReentrantLock 甚至更好 compare and swap
(CAS)。 CountDownLatch 是不够的,因为我想在达到条件后重置计数。
你能在 Java 7+ 中给我一些提示吗?
我想到了这个实现:
public class CountDownBarrier {
//public static final Logger LOG = LogManager.getLogger(CountDownBarrier.class);
protected int cnt;
protected int currCnt;
protected int sleep;
protected ReentrantLock lock = null;
//protected Condition cond = null;
public CountDownBarrier(int cnt, int sleep) {
this.cnt = cnt;
this.currCnt = cnt;
this.sleep = sleep;
lock = new ReentrantLock();
//cond = lock.newCondition();
}
public void acquire() throws InterruptedException {
lock.lock();
try {
if(currCnt <= 0) {
//LOG.info("Sleep starts ###################################");
Thread.sleep(sleep);
currCnt = cnt;
//LOG.info("Sleep is over ###################################");
}
currCnt--;
} finally {
lock.unlock();
}
}
}
我的测试用例如下所示:
public class CountDownBarrierTest {
public static final Logger LOG = LogManager.getLogger(CountDownBarrierTest.class);
public static int ITER = 5;
@Test
public void test1() throws InterruptedException {
LOG.info("CountDownBarrierTest.test1()");
CountDownBarrier barrier = new CountDownBarrier(4, 5000);
List<Thread> tList = new ArrayList<Thread>();
for(int i = 0; i < 3; i++) {
TheAction x = new TheAction(i, barrier);
tList.add(new Thread(x));
}
LOG.info("Start all threads");
for(Thread t : tList) {
t.start();
}
for(Thread t : tList) {
t.join();
}
LOG.info("All threads finished");
}
private class TheAction
implements Runnable {
private int id;
private CountDownBarrier barrier;
public TheAction(int id, CountDownBarrier barrier) {
this.id = id;
this.barrier = barrier;
}
@Override
public void run() {
try {
for(int i = 0; i < ITER; i++) {
barrier.acquire();
LOG.info("#" + id + ": Action!!");
Thread.sleep(1000);
//LOG.info("#" + id + ": ------------");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出可能如下所示:
15:38:46.926 [main] [INFO] CountDownBarrierTest - CountDownBarrierTest.test1()
15:38:46.933 [main] [INFO] CountDownBarrierTest- Start all threads
15:38:46.934 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:46.934 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:38:46.934 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:47.934 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:47.934 [Thread-3] [INFO] CountDownBarrier - Sleep starts ###################################
15:38:52.935 [Thread-3] [INFO] CountDownBarrier - Sleep is over ###################################
15:38:52.935 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:52.935 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:52.935 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:38:53.935 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:53.935 [Thread-3] [INFO] CountDownBarrier - Sleep starts ###################################
15:38:58.935 [Thread-3] [INFO] CountDownBarrier - Sleep is over ###################################
15:38:58.935 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:58.935 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:38:58.935 [Thread-1] [INFO] CountDownBarrierTest$TheAction - #0: Action!!
15:38:59.935 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:38:59.935 [Thread-2] [INFO] CountDownBarrier - Sleep starts ###################################
15:39:04.936 [Thread-2] [INFO] CountDownBarrier - Sleep is over ###################################
15:39:04.936 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:39:04.936 [Thread-3] [INFO] CountDownBarrierTest$TheAction - #2: Action!!
15:39:05.936 [Thread-2] [INFO] CountDownBarrierTest$TheAction - #1: Action!!
15:39:06.936 [main] [INFO] CountDownBarrierTest - All threads finished