如何在没有队列的情况下实现这种并发结构?
How can I implement this concurrent structure without the queue?
我的应用程序中有一种情况,事件进入并且处理它们的线程(信号线程)必须向另一个线程(工作线程)发出信号,到目前为止处于空闲状态,它可以 运行 一些代码。一旦工作线程完成,它应该等待再次发出信号。工作线程正在工作时,事件可能会到达。在这种情况下,它应该继续并立即继续工作。工作线程的一个动作足以处理任何数量的传入事件,因此无需为每个事件工作一次,只需在每个事件后尽快工作一次即可。正确行为示例:
event comes in
worker thread starts work
worker thread finishes work
event comes in
worker thread starts work
event comes in
event comes in
worker thread finishes work
worker thread starts work
worker thread finishes work
4 个事件,3 个工作时段。这是一个不幸但不可避免的要求,即信号线程在处理事件时不能阻塞。目前,我已经使用 BlockingQueue 实现了这一点,即使内容不感兴趣甚至看不到,它也会产生填充自身的毫无意义的副作用。我希望能够使用 CountDownLatch 或 CyclicBarrier 或类似工具来完成这项工作,但我一直找不到方法。这是我的实现:
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
private static final class MyBarrier {
private BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
void await() throws InterruptedException {
queue.take();
queue.clear();
}
void signal() {
queue.add(true);
}
}
private static Random random = new Random(0);
private static void sleepForMax(int maxMillis) {
sleep(random.nextInt(maxMillis));
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
MyBarrier myBarrier = new MyBarrier();
final ExecutorService singallingThread = Executors.newSingleThreadExecutor();
singallingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
sleepForMax(1_000); // simulate period between events arriving
myBarrier.signal();
System.out.println("Signalling work to be done");
}
System.out.println("Thread interrupted");
});
final ExecutorService workingThread = Executors.newSingleThreadExecutor();
workingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("Waiting for work");
myBarrier.await();
} catch (InterruptedException e) {
break;
}
System.out.println("Doing work...");
sleepForMax(3_000); // simulate work being done
System.out.println("Work done");
}
System.out.println("Thread interrupted");
});
sleep(10_000);
singallingThread.shutdownNow();
workingThread.shutdownNow();
}
}
执行此操作的更好方法是什么?
我正在对此进行试验,使用 java.util.concurrent.Phaser,这可能会起作用,但我之前没有使用过 Phaser,所以我不确定。
private static final class MyBarrier2 {
private Phaser phaser = new Phaser(1);
void await() throws InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.getPhase());
}
void signal() {
phaser.arrive();
}
}
当我 运行 你的代码和你使用 Phaser 的实现时,改变了睡眠时间,使信号每 800 毫秒发生一次,处理需要 1000 毫秒,我得到例如此输出:
00008: Waiting for work
00808: Signalling work to be done
00808: Doing work... <-- worker starts working
01608: Signalling work to be done <-- signal came, so there's more work
01808: Work done
01809: Waiting for work <-- waits for work...
02409: Signalling work to be done <-- ...for 600 ms, until the next signal
02409: Doing work...
(左边的数字是自开始以来的毫秒数。此外,您可以使用带有随机延迟的代码重现它,但这更难重现并在那里看到。)
如果我没理解错的话,这是错误的。例如。想象一下如果信号停止出现会发生什么。
您的代码可能可以针对您的特定情况进行此调整:
private static final class MyBarrierWithPhaser {
private final Phaser phaser = new Phaser(1);
private int lastObservedPhase; // Phaser has initial phase 0
void await() throws InterruptedException {
// only works for 1 producer 1 worker; lastObservedPhase is kind of thread-local
lastObservedPhase = phaser.awaitAdvanceInterruptibly(lastObservedPhase);
}
void signal() {
phaser.arrive();
}
}
有了这个,worker 记录它前进到的最后一个阶段,如果信号线程 "arrives" 在下一个 awaitAdvanceInterruptibly
之前,那么 Phaser 阶段得到更新,当 worker 试图等待时使用陈旧的阶段,它将立即进行;如果信号线程在 awaitAdvanceInterruptibly
之前没有到达,那么 worker 将等待直到信号线程最终到达。
使用更简单的同步原语,我可以想到如何使用synchronized
-wait()
-notify()
机制来实现:
private static final class MyBarrierWithSynchronized {
private boolean hasWork = false;
synchronized void await() throws InterruptedException {
while (!hasWork) {
wait();
}
hasWork = false;
}
synchronized void signal() {
hasWork = true;
notifyAll(); // or notify() if we are sure there is 1 signal thread and 1 worker thread
}
}
它有几个缺点:await()
如果线程正在等待进入它,则不会被中断。另外,有些人不喜欢在 this
上同步,为了简短起见,我保留了它。这可以使用 java.util.concurrent.*
类似物重写,这个实现不会有这两个缺点:
private static final class MyBarrierWithLock {
private boolean hasWorkFlag = false;
private final Lock lock = new ReentrantLock();
private final Condition hasWorkCond = lock.newCondition();
void await() throws InterruptedException {
lock.lockInterruptibly();
try {
while (!hasWorkFlag) {
hasWorkCond.await();
}
hasWorkFlag = false;
} finally {
lock.unlock();
}
}
void signal() {
lock.lock();
try {
hasWorkFlag = true;
hasWorkCond.signalAll(); // or signal() if we are sure there is 1 signal thread and 1 worker thread
} finally {
lock.unlock();
}
}
}
我的应用程序中有一种情况,事件进入并且处理它们的线程(信号线程)必须向另一个线程(工作线程)发出信号,到目前为止处于空闲状态,它可以 运行 一些代码。一旦工作线程完成,它应该等待再次发出信号。工作线程正在工作时,事件可能会到达。在这种情况下,它应该继续并立即继续工作。工作线程的一个动作足以处理任何数量的传入事件,因此无需为每个事件工作一次,只需在每个事件后尽快工作一次即可。正确行为示例:
event comes in
worker thread starts work
worker thread finishes work
event comes in
worker thread starts work
event comes in
event comes in
worker thread finishes work
worker thread starts work
worker thread finishes work
4 个事件,3 个工作时段。这是一个不幸但不可避免的要求,即信号线程在处理事件时不能阻塞。目前,我已经使用 BlockingQueue 实现了这一点,即使内容不感兴趣甚至看不到,它也会产生填充自身的毫无意义的副作用。我希望能够使用 CountDownLatch 或 CyclicBarrier 或类似工具来完成这项工作,但我一直找不到方法。这是我的实现:
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
private static final class MyBarrier {
private BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
void await() throws InterruptedException {
queue.take();
queue.clear();
}
void signal() {
queue.add(true);
}
}
private static Random random = new Random(0);
private static void sleepForMax(int maxMillis) {
sleep(random.nextInt(maxMillis));
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
MyBarrier myBarrier = new MyBarrier();
final ExecutorService singallingThread = Executors.newSingleThreadExecutor();
singallingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
sleepForMax(1_000); // simulate period between events arriving
myBarrier.signal();
System.out.println("Signalling work to be done");
}
System.out.println("Thread interrupted");
});
final ExecutorService workingThread = Executors.newSingleThreadExecutor();
workingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("Waiting for work");
myBarrier.await();
} catch (InterruptedException e) {
break;
}
System.out.println("Doing work...");
sleepForMax(3_000); // simulate work being done
System.out.println("Work done");
}
System.out.println("Thread interrupted");
});
sleep(10_000);
singallingThread.shutdownNow();
workingThread.shutdownNow();
}
}
执行此操作的更好方法是什么?
我正在对此进行试验,使用 java.util.concurrent.Phaser,这可能会起作用,但我之前没有使用过 Phaser,所以我不确定。
private static final class MyBarrier2 {
private Phaser phaser = new Phaser(1);
void await() throws InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.getPhase());
}
void signal() {
phaser.arrive();
}
}
当我 运行 你的代码和你使用 Phaser 的实现时,改变了睡眠时间,使信号每 800 毫秒发生一次,处理需要 1000 毫秒,我得到例如此输出:
00008: Waiting for work
00808: Signalling work to be done
00808: Doing work... <-- worker starts working
01608: Signalling work to be done <-- signal came, so there's more work
01808: Work done
01809: Waiting for work <-- waits for work...
02409: Signalling work to be done <-- ...for 600 ms, until the next signal
02409: Doing work...
(左边的数字是自开始以来的毫秒数。此外,您可以使用带有随机延迟的代码重现它,但这更难重现并在那里看到。)
如果我没理解错的话,这是错误的。例如。想象一下如果信号停止出现会发生什么。
您的代码可能可以针对您的特定情况进行此调整:
private static final class MyBarrierWithPhaser {
private final Phaser phaser = new Phaser(1);
private int lastObservedPhase; // Phaser has initial phase 0
void await() throws InterruptedException {
// only works for 1 producer 1 worker; lastObservedPhase is kind of thread-local
lastObservedPhase = phaser.awaitAdvanceInterruptibly(lastObservedPhase);
}
void signal() {
phaser.arrive();
}
}
有了这个,worker 记录它前进到的最后一个阶段,如果信号线程 "arrives" 在下一个 awaitAdvanceInterruptibly
之前,那么 Phaser 阶段得到更新,当 worker 试图等待时使用陈旧的阶段,它将立即进行;如果信号线程在 awaitAdvanceInterruptibly
之前没有到达,那么 worker 将等待直到信号线程最终到达。
使用更简单的同步原语,我可以想到如何使用synchronized
-wait()
-notify()
机制来实现:
private static final class MyBarrierWithSynchronized {
private boolean hasWork = false;
synchronized void await() throws InterruptedException {
while (!hasWork) {
wait();
}
hasWork = false;
}
synchronized void signal() {
hasWork = true;
notifyAll(); // or notify() if we are sure there is 1 signal thread and 1 worker thread
}
}
它有几个缺点:await()
如果线程正在等待进入它,则不会被中断。另外,有些人不喜欢在 this
上同步,为了简短起见,我保留了它。这可以使用 java.util.concurrent.*
类似物重写,这个实现不会有这两个缺点:
private static final class MyBarrierWithLock {
private boolean hasWorkFlag = false;
private final Lock lock = new ReentrantLock();
private final Condition hasWorkCond = lock.newCondition();
void await() throws InterruptedException {
lock.lockInterruptibly();
try {
while (!hasWorkFlag) {
hasWorkCond.await();
}
hasWorkFlag = false;
} finally {
lock.unlock();
}
}
void signal() {
lock.lock();
try {
hasWorkFlag = true;
hasWorkCond.signalAll(); // or signal() if we are sure there is 1 signal thread and 1 worker thread
} finally {
lock.unlock();
}
}
}