ReentrantReadWriteLock
约 1803 字大约 6 分钟
ReentrantReadWriteLock
15. ReentrantReadWriteLock
- 特点:公平、非公平锁、读写锁、重入锁、锁降级
1. 构造方法
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public static class ReadLock implements Lock, java.io.Serializable{
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
public static class WriteLock implements Lock, java.io.Serializable{
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
}
2. 写锁的获取tryAcquire(int acquires)与释放tryRelease(int releases)
- 写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
- 写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 //1左移16位然后减1 0x0000FFFF
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //取同步状态的低16位 表示写锁的获取次数
/*
* 在同步状态state上维护多个读线程和一个写线程的状态。读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
* 同步状态state不等于0时,当写状态(state&0x0000FFFF(抹去高16位与运算得到低16位的值))等于0时,则读状态(state>>>16)大于0,即读锁已被获取。
* 如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量
* 功能:写锁的获取:持有锁的线程state+1,非持有锁的线程CAS操作成功后获得锁
* 1 判断state是否有线程已经获取锁,若不等于0执行2,等于0则执行3
* 2 若持有锁的是当前线程时设置state+1并返回true(重入锁)
* 3 writerShouldBlock(非公平锁false(默认),公平锁hasQueuedPredecessors())或者CAS设置state失败,获取锁失败
* 4 将持有线程的设置为当前线程并返回true
*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();//1
int w = exclusiveCount(c);
if (c != 0) {//2
// 存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean writerShouldBlock() {//非公平锁(默认)
return false; // writers can always barge
}
final boolean writerShouldBlock() {//公平锁
return hasQueuedPredecessors();
}
/*
* 写锁的释放:设置state=state-1如果state=0则独占锁线程为null
* 1 state-1
* 2 写状态为0则占有锁的线程为null
* 3 设置state
*/
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//1
boolean free = exclusiveCount(nextc) == 0;//2
if (free)
setExclusiveOwnerThread(null);
setState(nextc);//3
return free;
}
3. 读锁的获取与释放
- 读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
- getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。因此,这里将获取读锁的代码做了删减,保留必要的部分
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //同步状态的高16位用来表示读锁被获取的次数
/*
* 读锁的获取
* 1 若持有锁的线程不为当前线程时,锁获取失败
* 2 获取同步状态state的高16位的读锁获取次数
* 3 如果头结点的下一个节点不是共享节点且CAS操作state设置高位成功则成功获取锁,如果不成功则执行4
* 4 调用fullTryAcquireShared()重复CAS操作直至获取共享锁(待看)
*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&//1
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//2
if (!readerShouldBlock() && //3
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//同步状态的高16位用来表示读锁被获取的次数
//省略无关代码
return 1;
}
return fullTryAcquireShared(current);
}
final boolean readerShouldBlock() {//非公平锁(默认)
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && (s = h.next) != null &&
!s.isShared() && s.thread != null;
}
final boolean readerShouldBlock() {//公平锁
return hasQueuedPredecessors();
}
//待看
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
//读锁的释放:将设置成state-SHARED_UNIT
//读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。
protected final boolean tryReleaseShared(int unused) {
//省略无关代码...
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
# 读锁简化版
protected final int tryAcquireShared(int unused) {
for (;;) {
int c = getState();
int nextc = c + (1 << 16);
if (nextc < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
return -1;
if (compareAndSetState(c, nextc))
return 1;
}
}
4. 锁降级
- 读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}