信号量不能避免线程丢失
Semaphores not avoiding thread loss
这是我的第一个问题,请多多包涵。
我目前正在 Java 中处理关于多线程和并发的 UNI 任务,我们被要求使用不同的线程锁定方法实现各种版本的“呼叫中心”,其中之一是信号量。我将直接进入代码以显示我的问题所在:
制作人Class:
public final class Caller implements Runnable {
private final CallCenter callCenter;
public Caller(long id, CallCenter callCenter) {
this.callCenter = callCenter;
}
@Override
public void run() {
try {
callCenter.receive(new Call());
} catch(Exception ex) {
throw new RuntimeException(ex);
}
}
}
消费者Class:
public final class Operator implements Runnable {
private final CallCenter callCenter;
private Call call;
public Operator(CallCenter callCenter) {
this.callCenter = callCenter;
}
@Override
public void run() {
try {
this.call = callCenter.answer();
} catch(InterruptedException ex) {
throw new RuntimeException(ex);
}
}
public Call getCall() {
return this.call;
}
}
服务:
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.LinkedList;
public final class BoundedCallCenterSemaphore implements BoundedCallCenter {
private final Queue<Call> pendingCalls = new LinkedList<Call>();
private Semaphore semaphore = new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true);
public void receive(Call call) throws Exception {
semaphore.acquire();
pendingCalls.add(call);
}
public Call answer() throws InterruptedException {
semaphore.release();
return pendingCalls.poll();
}
}
调用实现:
import java.util.concurrent.atomic.AtomicLong;
public final class Call {
private static final AtomicLong currentId = new AtomicLong();
private final long id = currentId.getAndIncrement();
public long getId() {
return id;
}
}
免责声明
我知道我可能没有按照预期的方式使用信号量,但是阅读官方文档 blogs/answers 根本没有帮助。
我们有以下约束:仅修改服务Class,使用信号量解决并且仅使用Semaphore.acquire()和Semaphore.receive()以避免竞争和忙等待,不允许其他方法或线程锁定结构
实际问题:
我将避免在此处发布我们教授编写的全部测试,只知道向服务发送了 100 个调用,为简单起见,每个调用者仅调用一次,每个接线员仅响应一次。当在没有信号量的情况下实现呼叫中心时,您会遇到由 while 循环生成的繁忙等待,并且并发性没有得到很好的管理,因为如果不同的线程同时操作,某些调用可能会被应答两次或更多次。这里的任务是消除繁忙的等待,并确保每个呼叫只被接听和接听一次。我尝试使用上面报告的信号量,虽然消除了忙等待,但一些呼叫最终根本没有得到应答。关于我做错了什么的任何建议?如何确保每个电话只接听一次?
最后我是用三个信号量搞定的。第一个信号量 new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true)
在 pendingCalls.size() >= MAX_NUMBER_OF_PENDING_CALLS
时从阻止新条目的意义上保护队列。第二个信号量 new Semaphore(1, true)
保护生产者线程,一次只允许一个线程访问队列以进行添加操作。第三个也是最后一个信号量在没有许可的情况下开始,并等待第一个生产者线程将第一个调用插入缓冲区 new Semaphore(0, true)
.
代码
public final class BoundedCallCenterSemaphore implements BoundedCallCenter {
private final LinkedList<Call> pendingCalls = new LinkedList<Call>();
static Semaphore receiver = new Semaphore(1, true);
static Semaphore storage = new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true);
static Semaphore operants = new Semaphore(0, true);
public void receive(Call call) throws Exception {
try {
storage.acquire();
}
catch (InterruptedException e)
{
}
try {
receiver.acquire();
}
catch (InterruptedException e)
{
}
synchronized (pendingCalls) {
pendingCalls.add(call);
operants.release();
}
}
public Call answer() throws InterruptedException {
try
{
operants.acquire();
}
catch (InterruptedException e)
{
}
Call call = null;
synchronized (pendingCalls) {
call = pendingCalls.poll();
storage.release();
receiver.release();
}
return call;
}
}
这是我的第一个问题,请多多包涵。
我目前正在 Java 中处理关于多线程和并发的 UNI 任务,我们被要求使用不同的线程锁定方法实现各种版本的“呼叫中心”,其中之一是信号量。我将直接进入代码以显示我的问题所在:
制作人Class:
public final class Caller implements Runnable {
private final CallCenter callCenter;
public Caller(long id, CallCenter callCenter) {
this.callCenter = callCenter;
}
@Override
public void run() {
try {
callCenter.receive(new Call());
} catch(Exception ex) {
throw new RuntimeException(ex);
}
}
}
消费者Class:
public final class Operator implements Runnable {
private final CallCenter callCenter;
private Call call;
public Operator(CallCenter callCenter) {
this.callCenter = callCenter;
}
@Override
public void run() {
try {
this.call = callCenter.answer();
} catch(InterruptedException ex) {
throw new RuntimeException(ex);
}
}
public Call getCall() {
return this.call;
}
}
服务:
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.LinkedList;
public final class BoundedCallCenterSemaphore implements BoundedCallCenter {
private final Queue<Call> pendingCalls = new LinkedList<Call>();
private Semaphore semaphore = new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true);
public void receive(Call call) throws Exception {
semaphore.acquire();
pendingCalls.add(call);
}
public Call answer() throws InterruptedException {
semaphore.release();
return pendingCalls.poll();
}
}
调用实现:
import java.util.concurrent.atomic.AtomicLong;
public final class Call {
private static final AtomicLong currentId = new AtomicLong();
private final long id = currentId.getAndIncrement();
public long getId() {
return id;
}
}
免责声明
我知道我可能没有按照预期的方式使用信号量,但是阅读官方文档 blogs/answers 根本没有帮助。 我们有以下约束:仅修改服务Class,使用信号量解决并且仅使用Semaphore.acquire()和Semaphore.receive()以避免竞争和忙等待,不允许其他方法或线程锁定结构
实际问题:
我将避免在此处发布我们教授编写的全部测试,只知道向服务发送了 100 个调用,为简单起见,每个调用者仅调用一次,每个接线员仅响应一次。当在没有信号量的情况下实现呼叫中心时,您会遇到由 while 循环生成的繁忙等待,并且并发性没有得到很好的管理,因为如果不同的线程同时操作,某些调用可能会被应答两次或更多次。这里的任务是消除繁忙的等待,并确保每个呼叫只被接听和接听一次。我尝试使用上面报告的信号量,虽然消除了忙等待,但一些呼叫最终根本没有得到应答。关于我做错了什么的任何建议?如何确保每个电话只接听一次?
最后我是用三个信号量搞定的。第一个信号量 new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true)
在 pendingCalls.size() >= MAX_NUMBER_OF_PENDING_CALLS
时从阻止新条目的意义上保护队列。第二个信号量 new Semaphore(1, true)
保护生产者线程,一次只允许一个线程访问队列以进行添加操作。第三个也是最后一个信号量在没有许可的情况下开始,并等待第一个生产者线程将第一个调用插入缓冲区 new Semaphore(0, true)
.
代码
public final class BoundedCallCenterSemaphore implements BoundedCallCenter {
private final LinkedList<Call> pendingCalls = new LinkedList<Call>();
static Semaphore receiver = new Semaphore(1, true);
static Semaphore storage = new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true);
static Semaphore operants = new Semaphore(0, true);
public void receive(Call call) throws Exception {
try {
storage.acquire();
}
catch (InterruptedException e)
{
}
try {
receiver.acquire();
}
catch (InterruptedException e)
{
}
synchronized (pendingCalls) {
pendingCalls.add(call);
operants.release();
}
}
public Call answer() throws InterruptedException {
try
{
operants.acquire();
}
catch (InterruptedException e)
{
}
Call call = null;
synchronized (pendingCalls) {
call = pendingCalls.poll();
storage.release();
receiver.release();
}
return call;
}
}