跳至主要內容

ReentrantReadWriteLock

HeChuangJun约 1803 字大约 6 分钟

ReentrantReadWriteLock

15. ReentrantReadWriteLock

  • 特点:公平、非公平锁、读写锁、重入锁、锁降级

1. 构造方法

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    public ReentrantReadWriteLock() {
        this(false);
    }
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    public static class ReadLock implements Lock, java.io.Serializable{
        private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }
    public static class WriteLock implements Lock, java.io.Serializable{
        private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }
    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }
}

2. 写锁的获取tryAcquire(int acquires)与释放tryRelease(int releases)

  • 写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
  • 写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
    static final int SHARED_SHIFT   = 16;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 //1左移16位然后减1 0x0000FFFF
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //取同步状态的低16位 表示写锁的获取次数
    /*
    * 在同步状态state上维护多个读线程和一个写线程的状态。读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
    * 同步状态state不等于0时,当写状态(state&0x0000FFFF(抹去高16位与运算得到低16位的值))等于0时,则读状态(state>>>16)大于0,即读锁已被获取。
    * 如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量
    *  功能:写锁的获取:持有锁的线程state+1,非持有锁的线程CAS操作成功后获得锁
    * 1 判断state是否有线程已经获取锁,若不等于0执行2,等于0则执行3
    * 2 若持有锁的是当前线程时设置state+1并返回true(重入锁)
    * 3 writerShouldBlock(非公平锁false(默认),公平锁hasQueuedPredecessors())或者CAS设置state失败,获取锁失败
    * 4 将持有线程的设置为当前线程并返回true
    */
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();//1
        int w = exclusiveCount(c);
        if (c != 0) {//2
            // 存在读锁或者当前获取线程不是已经获取写锁的线程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    final boolean writerShouldBlock() {//非公平锁(默认)
        return false; // writers can always barge
    }
    final boolean writerShouldBlock() {//公平锁
        return hasQueuedPredecessors();
    }
    /*
    *  写锁的释放:设置state=state-1如果state=0则独占锁线程为null
    *  1 state-1
    *  2 写状态为0则占有锁的线程为null
    *  3 设置state
    */
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;//1
        boolean free = exclusiveCount(nextc) == 0;//2
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);//3
        return free;
    }

3. 读锁的获取与释放

  • 读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
  • getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。因此,这里将获取读锁的代码做了删减,保留必要的部分
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } //同步状态的高16位用来表示读锁被获取的次数
    /*
    *  读锁的获取
    *  1 若持有锁的线程不为当前线程时,锁获取失败 
    *  2 获取同步状态state的高16位的读锁获取次数
    *  3 如果头结点的下一个节点不是共享节点且CAS操作state设置高位成功则成功获取锁,如果不成功则执行4
    *  4 调用fullTryAcquireShared()重复CAS操作直至获取共享锁(待看)
    */
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&//1
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);//2
        if (!readerShouldBlock() && //3
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {//同步状态的高16位用来表示读锁被获取的次数
            //省略无关代码
            return 1;
        }
        return fullTryAcquireShared(current);
    }

    final boolean readerShouldBlock() {//非公平锁(默认)
        return apparentlyFirstQueuedIsExclusive();
    }

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null && (s = h.next)  != null &&
        !s.isShared() && s.thread != null;
    }

    final boolean readerShouldBlock() {//公平锁
        return hasQueuedPredecessors();
    }
    //待看
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    //读锁的释放:将设置成state-SHARED_UNIT
    //读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。
    protected final boolean tryReleaseShared(int unused) {
        //省略无关代码...
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
# 读锁简化版
protected final int tryAcquireShared(int unused) {
    for (;;) {
        int c = getState();
        int nextc = c + (1 << 16);
        if (nextc < c)
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

4. 锁降级

  • 读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级
  class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
            rwl.readLock().lock();
            if (!cacheValid) {
                // Must release read lock before acquiring write lock
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try {
                    // Recheck state because another thread might have
                    // acquired write lock and changed state before we did.
                    if (!cacheValid) {
                        data = ...
                cacheValid = true;
            }
            // Downgrade by acquiring read lock before releasing write lock
            rwl.readLock().lock();
            } finally {
            rwl.writeLock().unlock(); // Unlock write, still hold read
            }
        }
    
        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
        }
    }