有条件的公平读写锁
Fair ReadWriteLock with Conditions
首先我检查了以前关于这个主题的问题,但是 none 适合我的具体问题。
我得到了以下代码,说明了带有时间戳的传感器和存储在双数组中的数据,另外还有我实现的 FairRWLock 的一个实例。
class LockedSensors implements Sensors {
long time = 0;
double data[];
FairRWLock lock = new FairRWLock();
LockedSensors() {
time = 0;
}
// store data and timestamp
// if and only if data stored previously is older (lower timestamp)
public void update(long timestamp, double[] data) {
lock.writeAcquire();
if (timestamp > time) {
if (this.data == null)
this.data = new double[data.length];
time = timestamp;
for (int i = 0; i < data.length; ++i)
this.data[i] = data[i];
}
lock.writeRelease();
}
// pre: val != null
// pre: val.length matches length of data written via update
// if no data has been written previously, return 0
// otherwise return current timestamp and fill current data to array passed
// as val
public long get(double val[]) {
try{
lock.readAcquire();
if (time == 0) return 0;
for (int i = 0; i < data.length; ++i)
val[i] = data[i];
return time;
} finally{lock.readRelease();}
}
}
它支持更新,这取决于接收到新数据的时间,以及提取存储在特定传感器中的数据。
这是我对 FairRWLock 的实现:
class FairRWLock{
private int readers = 0, writers = 0, readersWaiting = 0, writersWaiting = 0, writersWait = 0;
private static final int ReaderPriority = 30;
private Lock lock = new ReentrantLock();
private Condition readerPass = lock.newCondition();
private Condition writerPass = lock.newCondition();
/*
* readers denotes the number of current readers, writers equivalent, readersWaiting denotes the number of readers
* awaiting their signal, writersWaiting equivalent. writersWait denotes the number of readers the writers have to
* let pass before they can proceed, this amount is controlled by the ReaderPriority (reset occurs when writer releases)
*/
/*
* increment the number of waiting readers, check if there are any currently working writers OR writers are waiting
* whilst they don't have to let any readers pass. When signaled, decrement readersWaiting, decrement the number of
* readers the writers have to let pass and increment the number of current readers.
*/
public void readAcquire(){
lock.lock();
readersWaiting++;
while(writers > 0 || (writersWaiting > 0 && writersWait <= 0)){
try {
readerPass.await();
} catch (InterruptedException e) {}
}
readersWaiting--;
writersWait--;
readers++;
lock.unlock();
}
/*
* simply decrement number of readers and signal the threads that have to be signaled
*/
public void readRelease(){
lock.lock();
readers--;
signaling();
lock.unlock();
}
/*
* increment number of waiting writers, check if there are currently working writers OR readers OR readers currently
* have priority over the writers. When signaled decrement writersWaiting, increment number of writers
*/
public void writeAcquire(){
lock.lock();
writersWaiting++;
while(writers > 0 || readers > 0 || (readersWaiting > 0 && writersWait > 0)){
try{
writerPass.await();
} catch(InterruptedException e) {}
}
writersWaiting--;
writers++;
lock.unlock();
}
/*
* simply decrement number of current writers, reset the number of readers the writers have to let pass before
* another writer may pass. signal the ones that should be
*/
public void writeRelease(){
lock.lock();
writers--;
writersWait = ReaderPriority;
signaling();
lock.unlock();
}
/*
* check first if readers currently got priority over the writers. if so (readersWaiting??) ? signal them : signalAll,
* if not (writersWaiting??) ? signal them : signalAll
*/
private void signaling(){
if(writersWait > 0){
if(readersWaiting > 0) readerPass.signalAll();
else writerPass.signal();
} else{
if(writersWaiting > 0) writerPass.signal();
else readerPass.signalAll();
}
}
}
我对按条件锁定不是很熟悉,而且我的代码似乎遇到饥饿甚至死锁的问题。但是我找不到问题(很可能是在 FairRWLock 实现中的某个地方)。
试图在非公平锁之上构建公平锁是没有意义的。当线程进入 readAcquire()
或 writeAcquire()
时,它们正在调用 lock.lock()
,如果没有立即成功,它们可能会进入等待状态并被任意数量的线程才能继续。
至此,无论你以后做什么,都已经不可能再建立公平了。但值得注意的是,您还遗漏了 await()
的含义。此操作将暂时释放锁,因为只有这样才能让其他线程有机会满足您正在等待的条件。当线程得到 signal()
ed 时,它必须重新获取锁,这又是一个不公平的操作。任意数量的线程都可以发出新的锁定请求,在很久以前调用 await()
的线程将继续进行之前完全改变这种情况。
说到底,你不要公平。 update
操作旨在忽略过时的更新,因此如果新的 update
请求可以更快地进行,这实际上是一个胜利,因为挂起的旧请求将变为无操作。对于并发 get
请求,您实际上根本不想阻塞,所有读取请求都应该能够 运行 并发,但是,当然,您想要一致性(线程安全)并且没有 writer starvation .
最好的解决方案是完全不加锁,实现整个操作无锁:
class LockedSensors implements Sensors {
private static final class State {
final long time;
final double[] data;
State(long t, double[] in) {
time = t;
data = in.clone();
}
}
final AtomicReference<State> current = new AtomicReference<>();
LockedSensors() {}
// store data and timestamp
// if and only if data stored previously is older (lower timestamp)
public void update(long timestamp, double[] data) {
State newState = null;
for(;;) {
State old = current.get();
if(old != null && old.time > timestamp) return;
if(newState == null) newState = new State(timestamp, data);
if(current.compareAndSet(old, newState)) return;
}
}
// pre: val != null
// pre: val.length matches length of data written via update
// if no data has been written previously, return 0
// otherwise return current timestamp and fill current data to array passed as val
public long get(double[] val) {
State actual = current.get();
if(actual == null) return 0;
if(actual.data.length != val.length)
throw new IllegalArgumentException();
System.arraycopy(actual.data, 0, val, 0, actual.data.length);
return actual.time;
}
}
在这里,读者始终可以继续 returning 上次完成更新的结果,而不会阻止任何作者。甚至作者之间也不会互相阻塞,但如果中间发生另一个更新,则可能不得不旋转,但由于每个作者都会 return 一旦遇到更新的时间戳,并且总会有至少一个作者在制作progress,这里没有问题。
首先我检查了以前关于这个主题的问题,但是 none 适合我的具体问题。
我得到了以下代码,说明了带有时间戳的传感器和存储在双数组中的数据,另外还有我实现的 FairRWLock 的一个实例。
class LockedSensors implements Sensors {
long time = 0;
double data[];
FairRWLock lock = new FairRWLock();
LockedSensors() {
time = 0;
}
// store data and timestamp
// if and only if data stored previously is older (lower timestamp)
public void update(long timestamp, double[] data) {
lock.writeAcquire();
if (timestamp > time) {
if (this.data == null)
this.data = new double[data.length];
time = timestamp;
for (int i = 0; i < data.length; ++i)
this.data[i] = data[i];
}
lock.writeRelease();
}
// pre: val != null
// pre: val.length matches length of data written via update
// if no data has been written previously, return 0
// otherwise return current timestamp and fill current data to array passed
// as val
public long get(double val[]) {
try{
lock.readAcquire();
if (time == 0) return 0;
for (int i = 0; i < data.length; ++i)
val[i] = data[i];
return time;
} finally{lock.readRelease();}
}
}
它支持更新,这取决于接收到新数据的时间,以及提取存储在特定传感器中的数据。
这是我对 FairRWLock 的实现:
class FairRWLock{
private int readers = 0, writers = 0, readersWaiting = 0, writersWaiting = 0, writersWait = 0;
private static final int ReaderPriority = 30;
private Lock lock = new ReentrantLock();
private Condition readerPass = lock.newCondition();
private Condition writerPass = lock.newCondition();
/*
* readers denotes the number of current readers, writers equivalent, readersWaiting denotes the number of readers
* awaiting their signal, writersWaiting equivalent. writersWait denotes the number of readers the writers have to
* let pass before they can proceed, this amount is controlled by the ReaderPriority (reset occurs when writer releases)
*/
/*
* increment the number of waiting readers, check if there are any currently working writers OR writers are waiting
* whilst they don't have to let any readers pass. When signaled, decrement readersWaiting, decrement the number of
* readers the writers have to let pass and increment the number of current readers.
*/
public void readAcquire(){
lock.lock();
readersWaiting++;
while(writers > 0 || (writersWaiting > 0 && writersWait <= 0)){
try {
readerPass.await();
} catch (InterruptedException e) {}
}
readersWaiting--;
writersWait--;
readers++;
lock.unlock();
}
/*
* simply decrement number of readers and signal the threads that have to be signaled
*/
public void readRelease(){
lock.lock();
readers--;
signaling();
lock.unlock();
}
/*
* increment number of waiting writers, check if there are currently working writers OR readers OR readers currently
* have priority over the writers. When signaled decrement writersWaiting, increment number of writers
*/
public void writeAcquire(){
lock.lock();
writersWaiting++;
while(writers > 0 || readers > 0 || (readersWaiting > 0 && writersWait > 0)){
try{
writerPass.await();
} catch(InterruptedException e) {}
}
writersWaiting--;
writers++;
lock.unlock();
}
/*
* simply decrement number of current writers, reset the number of readers the writers have to let pass before
* another writer may pass. signal the ones that should be
*/
public void writeRelease(){
lock.lock();
writers--;
writersWait = ReaderPriority;
signaling();
lock.unlock();
}
/*
* check first if readers currently got priority over the writers. if so (readersWaiting??) ? signal them : signalAll,
* if not (writersWaiting??) ? signal them : signalAll
*/
private void signaling(){
if(writersWait > 0){
if(readersWaiting > 0) readerPass.signalAll();
else writerPass.signal();
} else{
if(writersWaiting > 0) writerPass.signal();
else readerPass.signalAll();
}
}
}
我对按条件锁定不是很熟悉,而且我的代码似乎遇到饥饿甚至死锁的问题。但是我找不到问题(很可能是在 FairRWLock 实现中的某个地方)。
试图在非公平锁之上构建公平锁是没有意义的。当线程进入 readAcquire()
或 writeAcquire()
时,它们正在调用 lock.lock()
,如果没有立即成功,它们可能会进入等待状态并被任意数量的线程才能继续。
至此,无论你以后做什么,都已经不可能再建立公平了。但值得注意的是,您还遗漏了 await()
的含义。此操作将暂时释放锁,因为只有这样才能让其他线程有机会满足您正在等待的条件。当线程得到 signal()
ed 时,它必须重新获取锁,这又是一个不公平的操作。任意数量的线程都可以发出新的锁定请求,在很久以前调用 await()
的线程将继续进行之前完全改变这种情况。
说到底,你不要公平。 update
操作旨在忽略过时的更新,因此如果新的 update
请求可以更快地进行,这实际上是一个胜利,因为挂起的旧请求将变为无操作。对于并发 get
请求,您实际上根本不想阻塞,所有读取请求都应该能够 运行 并发,但是,当然,您想要一致性(线程安全)并且没有 writer starvation .
最好的解决方案是完全不加锁,实现整个操作无锁:
class LockedSensors implements Sensors {
private static final class State {
final long time;
final double[] data;
State(long t, double[] in) {
time = t;
data = in.clone();
}
}
final AtomicReference<State> current = new AtomicReference<>();
LockedSensors() {}
// store data and timestamp
// if and only if data stored previously is older (lower timestamp)
public void update(long timestamp, double[] data) {
State newState = null;
for(;;) {
State old = current.get();
if(old != null && old.time > timestamp) return;
if(newState == null) newState = new State(timestamp, data);
if(current.compareAndSet(old, newState)) return;
}
}
// pre: val != null
// pre: val.length matches length of data written via update
// if no data has been written previously, return 0
// otherwise return current timestamp and fill current data to array passed as val
public long get(double[] val) {
State actual = current.get();
if(actual == null) return 0;
if(actual.data.length != val.length)
throw new IllegalArgumentException();
System.arraycopy(actual.data, 0, val, 0, actual.data.length);
return actual.time;
}
}
在这里,读者始终可以继续 returning 上次完成更新的结果,而不会阻止任何作者。甚至作者之间也不会互相阻塞,但如果中间发生另一个更新,则可能不得不旋转,但由于每个作者都会 return 一旦遇到更新的时间戳,并且总会有至少一个作者在制作progress,这里没有问题。