在 Java 中实施资源 read/write 锁定

Implementing a resource read/write lock in Java

我正在尝试为多个线程并发访问的资源实现一个简单的 read/write 锁。工作人员随机尝试读取或写入共享对象。当设置了读锁时,worker 在锁被释放之前不能写。当设置写锁时,不允许读取和写入。 虽然我的实现似乎有效,但我认为它在概念上是错误的。

发生的读取操作应允许同时发生更多的读取操作,从而导致读取的总数大于写入的数量。我的程序生成的数字遵循工人执行这些操作的概率。

我觉得我的实现实际上根本不是并发的,但我很难找出错误。我真的很感激被指出正确的方向。

派遣和终止工人的主要class:

class Main {

    private static final int THREAD_NUMBER = 4;

    public static void main(String[] args) {
        // creating workers
        Thread[] workers = new Thread[THREAD_NUMBER];
        for (int i = 0; i < THREAD_NUMBER; i++) {
            workers[i] = new Thread(new Worker(i + 1));
        }
        System.out.println("Spawned workers: " + THREAD_NUMBER);

        // starting workers
        for (Thread t : workers) {
            t.start();
        }
        try {
            Thread.sleep((long) 10000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // stopping workers
        System.out.println("Stopping workers...");
        for (Thread t : workers) {
            t.interrupt();
        }
    }
}

资源class:

class Resource {

    enum ResourceLock {
        ON,
        OFF
    } 

    private static Resource instance = null;
    private ResourceLock writeLock = ResourceLock.OFF;
    private ResourceLock readLock = ResourceLock.OFF;

    private Resource() {}

    public static synchronized Resource getInstance() {
        if (instance == null) {
            instance = new Resource();
        }
        return instance;
    }

    public ResourceLock getWriteLock() {
        return writeLock;
    }
    public ResourceLock getReadLock() {
        return readLock;
    }
    public void setWriteLock() {
        writeLock = ResourceLock.ON;
    }
    public void setReadLock() {
        readLock = ResourceLock.ON;
    }
    public void releaseWriteLock() {
        writeLock = ResourceLock.OFF;
    }
    public void releaseReadLock() {
        readLock = ResourceLock.OFF;
    }
}

最后是工人 class:

import java.util.Random;

class Worker implements Runnable {

    private static final double WRITE_PROB = 0.5;
    private static Random rand = new Random();
    private Resource res;
    private int id;

    public Worker(int id) {
        res = Resource.getInstance();
        this.id = id;
    }

    public void run() {
        message("Started.");
        while (!Thread.currentThread().isInterrupted()) {
            performAction();
        }
    }

    private void message(String msg) {
        System.out.println("Worker " + id + ": " + msg);
    }

    private void read() {
        synchronized(res) {
            while (res.getWriteLock() == Resource.ResourceLock.ON) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            res.setReadLock();
            // perform read
            try {
                Thread.sleep((long) 500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            res.releaseReadLock();
            res.notifyAll();
        }
        message("Finished reading.");
    }

    private void write() {
        synchronized(res) {
            while (res.getWriteLock() == Resource.ResourceLock.ON || res.getReadLock() == Resource.ResourceLock.ON) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            res.setWriteLock();
            // perform write
            try {
                Thread.sleep((long) 500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            res.releaseWriteLock();
            res.notifyAll();
        }
        message("Finished writing.");
    }

    private void performAction() {
        double r = rand.nextDouble();
        if (r <= WRITE_PROB) {
            write();
        } else {
            read();
        }
    }
}

拥有两个单独的读写锁背后的原因是我希望能够将操作及其对锁的查询原子化。

这是我在写入概率为 0.5 时得到的输出示例:

Spawned workers: 4
Worker 2: Started.
Worker 3: Started.
Worker 1: Started.
Worker 4: Started.
Worker 2: Finished writing.
Worker 4: Finished reading.
Worker 1: Finished writing.
Worker 3: Finished writing.
Worker 1: Finished reading.
Worker 4: Finished writing.
Worker 2: Finished reading.
Worker 4: Finished reading.
Worker 1: Finished reading.
Worker 3: Finished writing.
Worker 1: Finished writing.
Worker 4: Finished writing.
Worker 2: Finished writing.
Worker 4: Finished writing.
Worker 1: Finished reading.
Worker 3: Finished writing.
Worker 1: Finished writing.
Worker 4: Finished reading.
Worker 2: Finished writing.
Stopping workers...
Worker 4: Finished writing.
Worker 1: Finished writing.
Worker 3: Finished reading.
Worker 2: Finished reading.

非常感谢帮助。

您正在 synchronized 块中执行整个操作,因此没有并发。此外,任何锁类型都没有优先级,因为最多一个线程可以拥有一个锁。不在 synchronized 块中执行整个操作将不适用于您当前的代码,因为每个 reader 最后都会执行 readLock = ResourceLock.OFF,无论有多少 reader在那里。如果没有计数器,您将无法正确支持多个 reader。

除此之外,它是一个奇怪的代码结构,提供 Resource class 维护状态,但完全由调用者使用它做正确的事情。这不是处理责任和封装的方式。

一个实现可能看起来像

class ReadWriteLock {
    static final int WRITE_LOCKED = -1, FREE = 0;

    private int numberOfReaders = FREE;
    private Thread currentWriteLockOwner;

    public synchronized void acquireReadLock() throws InterruptedException {
        while(numberOfReaders == WRITE_LOCKED) wait();
        numberOfReaders++;
    }
    public synchronized void releaseReadLock() {
        if(numberOfReaders <= 0) throw new IllegalMonitorStateException();
        numberOfReaders--;
        if(numberOfReaders == FREE) notifyAll();
    }
    public synchronized void acquireWriteLock() throws InterruptedException {
        while(numberOfReaders != FREE) wait();
        numberOfReaders = WRITE_LOCKED;
        currentWriteLockOwner = Thread.currentThread();
    }
    public synchronized void releaseWriteLock() {
        if(numberOfReaders!=WRITE_LOCKED || currentWriteLockOwner!=Thread.currentThread())
            throw new IllegalMonitorStateException();
        numberOfReaders = FREE;
        currentWriteLockOwner = null;
        notifyAll();
    }
}

它只是简单地使用一个获取读锁的计数器,当有写锁时将计数器设置为-1(因此写锁不能嵌套)。在没有写锁的情况下获取读锁可能会成功,所以不需要为它们实现优先级,当另一个线程已经有真正的锁时成功的可能性就足够了。事实上,当 reader 的数量明显多于 writers 时,您可能会遇到 “starving writer” problem.

工人简化为

class Worker implements Runnable {
    private static final double WRITE_PROB = 0.5;
    private static final Random rand = new Random();
    private final ReadWriteLock theLock;
    private final int id;

    public Worker(int id, ReadWriteLock lock) {
        theLock = lock;
        this.id = id;
    }

    public void run() {
        message("Started.");
        while(!Thread.currentThread().isInterrupted()) {
            performAction();
        }
    }

    private void message(String msg) {
        System.out.println("Worker " + id + ": " + msg);
    }

    private void read() {
        try {
            theLock.acquireReadLock();
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // perform read
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally { theLock.releaseReadLock(); }
        message("Finished reading.");
    }

    private void write() {
        try {
            theLock.acquireWriteLock();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // perform write
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally { theLock.releaseWriteLock(); }
        message("Finished writing.");
    }

    private void performAction() {
        double r = rand.nextDouble();
        if (r <= WRITE_PROB) {
            write();
        } else {
            read();
        }
    }
}

请注意,我在这里避免使用全局变量。锁应该传递给构造函数。同样重要的是方法 return 在获取锁期间被中断。像在原始代码中那样自中断并重试获取将导致无限循环,因为下一次等待将在您恢复当前线程的中断状态后再次抛出 InterruptedException。当然,在没有锁的情况下继续也是错误的,所以唯一有效的选择不是恢复中断状态或立即 returning。

您的主程序唯一的变化是构造一个传递锁实例:

ReadWriteLock sharedLock = new ReadWriteLock();
// creating workers
Thread[] workers = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
    workers[i] = new Thread(new Worker(i + 1, sharedLock));
}
System.out.println("Spawned workers: " + THREAD_NUMBER);

// starting workers
for (Thread t : workers) {
    t.start();
}
try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

// stopping workers
System.out.println("Stopping workers...");
for (Thread t : workers) {
    t.interrupt();
}

这是 ReadWriteLock 的简单实现,写入操作具有更高的优先级:

public class ReadWriteLock{

  private int readers       = 0;
  private int writers       = 0;
  private int writeRequests = 0;

  public synchronized void lockRead() throws InterruptedException{
    while(writers > 0 || writeRequests > 0){
      wait();
    }
    readers++;
  }

  public synchronized void unlockRead(){
    readers--;
    notifyAll();
  }

  public synchronized void lockWrite() throws InterruptedException{
    writeRequests++;

    while(readers > 0 || writers > 0){
      wait();
    }
    writeRequests--;
    writers++;
  }

  public synchronized void unlockWrite() throws InterruptedException{
    writers--;
    notifyAll();
  }
}

来源:http://tutorials.jenkov.com/java-concurrency/read-write-locks.html