带有信号量的生产者-消费者陷入僵局
Producer-Consumer with semaphores gets deadlocked
我正在尝试仅使用信号量创建生产者-消费者。使用以下代码
public class Application {
public static int id = 0;
public static void main(String[] args) {
Semaphore producerSem = new Semaphore(1);
Semaphore consumerSem = new Semaphore(1);
Queue<Integer> line = new LinkedList<>();
Runnable produce = () -> {
try {
producerSem.acquire();
System.out.println(Thread.currentThread().getName() + " producing");
while (line.size() > 10) continue;
Thread.sleep(2000);
line.offer(id);
System.out.println(Thread.currentThread().getName() + " produced thing with id: " + id);
id++;
System.out.println(Thread.currentThread().getName() + " finished producing");
producerSem.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consume = () -> {
try {
consumerSem.acquire();
System.out.println(Thread.currentThread().getName() + " consuming");
while (line.size() < 1) continue;
int product = line.remove();
System.out.println(Thread.currentThread().getName() + " consumed thing with id: " + product);
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " finished consuming");
consumerSem.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 100; i++) {
new Thread(consume, "Consumer - " + i).start();
new Thread(produce, "Producer - " + i).start();
}
}
}
它陷入僵局,当它转到 id 10 并停止时我无法调试,所以它看起来像一个元素已被删除但它无法继续前进。在任何地方设置断点都可以正常工作,即使在任何任务结束时也是如此
我认为只使用信号量来编写线程安全代码是一个不错的练习。
你应该保护三个方面:
id
是一个简单的全局整数,只有一个进程应该 立即写入。
- 向队列添加或删除作业也应受到保护。
- 虽然我们可以使用队列保护来检查可用性,但等待新作业的一种优雅方式是使用同步计数器(信号量)。这样我们就不会去查询queue只是为了check。
修改后的代码可以是:
// our `id` counter...
private static int id = 0;
// ...and their semaphore for thread safety
private static Semaphore idSemaphore = new Semaphore(1);
// jobs queue...
private static Queue<Integer> lineJobs = new LinkedList<>();
// ...and their semaphore for thread safety
private static Semaphore lineSemaphore = new Semaphore(1);
// thread safe counter for (thread safety) jobs availability check
private static Semaphore lineSemaphoreAvailable = new Semaphore(0);
public static void main(String[] args) {
Runnable produce = () -> {
// we will set `myId` only once
final int myId;
// create a new id
try {
// wait to be granted to touch the global (unsafe integer) `id`
idSemaphore.acquire();
myId = id++;
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
// with exception or not release the semaphore
idSemaphore.release();
}
System.out.printf("%s: producing job #%d...%n", Thread.currentThread().getName(), myId);
// producing job...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
}
// store the job
try {
// wait to be granted to touch the global (unsafe queue) `lineJobs`
lineSemaphore.acquire();
lineJobs.add(myId);
// notify to consumers a new one job is available to peek
lineSemaphoreAvailable.release();
System.out.printf("%s: job #%d ready to do!%n", Thread.currentThread().getName(), myId);
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
lineSemaphore.release();
}
};
Runnable consume = () -> {
final int myId;
// get a job
try {
// wait for a new job to become available
lineSemaphoreAvailable.acquire();
// wait to be granted to touch the global (unsafe queue) `lineJobs`
lineSemaphore.acquire();
myId = lineJobs.remove();
System.out.printf("%s: job #%d adquired%n", Thread.currentThread().getName(), myId);
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
lineSemaphore.release();
}
System.out.printf("%s: job #%d finished!%n", Thread.currentThread().getName(), myId);
};
for (int i = 0; i < 100; i++) {
new Thread(consume, "Consumer - " + i).start();
new Thread(produce, "Producer - " + i).start();
}
}
有输出
Producer - 0: producing job #0...
Producer - 1: producing job #1...
...
Producer - 46: producing job #45...
Producer - 45: producing job #44...
Producer - 9: job #9 ready to do!
Producer - 44: producing job #46...
Producer - 47: producing job #47...
Producer - 48: producing job #48...
Producer - 49: producing job #49...
Consumer - 0: job #9 adquired
Consumer - 0: job #9 finished!
Producer - 14: job #14 ready to do!
Consumer - 1: job #14 adquired
Consumer - 1: job #14 finished!
Producer - 50: producing job #50...
Producer - 51: producing job #51...
...
Producer - 72: producing job #72...
Producer - 39: job #39 ready to do!
Consumer - 2: job #39 adquired
Consumer - 2: job #39 finished!
Producer - 73: producing job #73...
Producer - 74: producing job #74...
Producer - 75: producing job #75...
Producer - 10: job #10 ready to do!
Producer - 40: job #40 ready to do!
Consumer - 3: job #10 adquired
Consumer - 3: job #10 finished!
Producer - 76: producing job #76...
Producer - 77: producing job #77...
...
Producer - 82: producing job #82...
Producer - 83: producing job #83...
Producer - 84: producing job #84...
Consumer - 4: job #40 adquired
Consumer - 4: job #40 finished!
Producer - 85: producing job #85...
Producer - 86: producing job #86...
Producer - 71: job #71 ready to do!
Producer - 87: producing job #87...
Consumer - 5: job #71 adquired
Consumer - 5: job #71 finished!
Producer - 88: producing job #88...
Producer - 92: producing job #92...
Producer - 91: producing job #91...
...
Producer - 96: producing job #95...
Producer - 98: producing job #98...
Producer - 99: producing job #99...
Producer - 84: job #84 ready to do!
Consumer - 6: job #84 adquired
Consumer - 6: job #84 finished!
...
Consumer - 97: job #63 adquired
Consumer - 97: job #63 finished!
Producer - 90: job #90 ready to do!
Consumer - 98: job #90 adquired
Consumer - 98: job #90 finished!
Producer - 87: job #87 ready to do!
Consumer - 99: job #87 adquired
Consumer - 99: job #87 finished!
Process finished with exit code 0
我正在尝试仅使用信号量创建生产者-消费者。使用以下代码
public class Application {
public static int id = 0;
public static void main(String[] args) {
Semaphore producerSem = new Semaphore(1);
Semaphore consumerSem = new Semaphore(1);
Queue<Integer> line = new LinkedList<>();
Runnable produce = () -> {
try {
producerSem.acquire();
System.out.println(Thread.currentThread().getName() + " producing");
while (line.size() > 10) continue;
Thread.sleep(2000);
line.offer(id);
System.out.println(Thread.currentThread().getName() + " produced thing with id: " + id);
id++;
System.out.println(Thread.currentThread().getName() + " finished producing");
producerSem.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consume = () -> {
try {
consumerSem.acquire();
System.out.println(Thread.currentThread().getName() + " consuming");
while (line.size() < 1) continue;
int product = line.remove();
System.out.println(Thread.currentThread().getName() + " consumed thing with id: " + product);
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " finished consuming");
consumerSem.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 100; i++) {
new Thread(consume, "Consumer - " + i).start();
new Thread(produce, "Producer - " + i).start();
}
}
}
它陷入僵局,当它转到 id 10 并停止时我无法调试,所以它看起来像一个元素已被删除但它无法继续前进。在任何地方设置断点都可以正常工作,即使在任何任务结束时也是如此
我认为只使用信号量来编写线程安全代码是一个不错的练习。
你应该保护三个方面:
id
是一个简单的全局整数,只有一个进程应该 立即写入。- 向队列添加或删除作业也应受到保护。
- 虽然我们可以使用队列保护来检查可用性,但等待新作业的一种优雅方式是使用同步计数器(信号量)。这样我们就不会去查询queue只是为了check。
修改后的代码可以是:
// our `id` counter...
private static int id = 0;
// ...and their semaphore for thread safety
private static Semaphore idSemaphore = new Semaphore(1);
// jobs queue...
private static Queue<Integer> lineJobs = new LinkedList<>();
// ...and their semaphore for thread safety
private static Semaphore lineSemaphore = new Semaphore(1);
// thread safe counter for (thread safety) jobs availability check
private static Semaphore lineSemaphoreAvailable = new Semaphore(0);
public static void main(String[] args) {
Runnable produce = () -> {
// we will set `myId` only once
final int myId;
// create a new id
try {
// wait to be granted to touch the global (unsafe integer) `id`
idSemaphore.acquire();
myId = id++;
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
// with exception or not release the semaphore
idSemaphore.release();
}
System.out.printf("%s: producing job #%d...%n", Thread.currentThread().getName(), myId);
// producing job...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
}
// store the job
try {
// wait to be granted to touch the global (unsafe queue) `lineJobs`
lineSemaphore.acquire();
lineJobs.add(myId);
// notify to consumers a new one job is available to peek
lineSemaphoreAvailable.release();
System.out.printf("%s: job #%d ready to do!%n", Thread.currentThread().getName(), myId);
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
lineSemaphore.release();
}
};
Runnable consume = () -> {
final int myId;
// get a job
try {
// wait for a new job to become available
lineSemaphoreAvailable.acquire();
// wait to be granted to touch the global (unsafe queue) `lineJobs`
lineSemaphore.acquire();
myId = lineJobs.remove();
System.out.printf("%s: job #%d adquired%n", Thread.currentThread().getName(), myId);
} catch (InterruptedException e) {
System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
return;
} finally {
lineSemaphore.release();
}
System.out.printf("%s: job #%d finished!%n", Thread.currentThread().getName(), myId);
};
for (int i = 0; i < 100; i++) {
new Thread(consume, "Consumer - " + i).start();
new Thread(produce, "Producer - " + i).start();
}
}
有输出
Producer - 0: producing job #0...
Producer - 1: producing job #1...
...
Producer - 46: producing job #45...
Producer - 45: producing job #44...
Producer - 9: job #9 ready to do!
Producer - 44: producing job #46...
Producer - 47: producing job #47...
Producer - 48: producing job #48...
Producer - 49: producing job #49...
Consumer - 0: job #9 adquired
Consumer - 0: job #9 finished!
Producer - 14: job #14 ready to do!
Consumer - 1: job #14 adquired
Consumer - 1: job #14 finished!
Producer - 50: producing job #50...
Producer - 51: producing job #51...
...
Producer - 72: producing job #72...
Producer - 39: job #39 ready to do!
Consumer - 2: job #39 adquired
Consumer - 2: job #39 finished!
Producer - 73: producing job #73...
Producer - 74: producing job #74...
Producer - 75: producing job #75...
Producer - 10: job #10 ready to do!
Producer - 40: job #40 ready to do!
Consumer - 3: job #10 adquired
Consumer - 3: job #10 finished!
Producer - 76: producing job #76...
Producer - 77: producing job #77...
...
Producer - 82: producing job #82...
Producer - 83: producing job #83...
Producer - 84: producing job #84...
Consumer - 4: job #40 adquired
Consumer - 4: job #40 finished!
Producer - 85: producing job #85...
Producer - 86: producing job #86...
Producer - 71: job #71 ready to do!
Producer - 87: producing job #87...
Consumer - 5: job #71 adquired
Consumer - 5: job #71 finished!
Producer - 88: producing job #88...
Producer - 92: producing job #92...
Producer - 91: producing job #91...
...
Producer - 96: producing job #95...
Producer - 98: producing job #98...
Producer - 99: producing job #99...
Producer - 84: job #84 ready to do!
Consumer - 6: job #84 adquired
Consumer - 6: job #84 finished!
...
Consumer - 97: job #63 adquired
Consumer - 97: job #63 finished!
Producer - 90: job #90 ready to do!
Consumer - 98: job #90 adquired
Consumer - 98: job #90 finished!
Producer - 87: job #87 ready to do!
Consumer - 99: job #87 adquired
Consumer - 99: job #87 finished!
Process finished with exit code 0