什么是ReentrantReadWriteLock?

ReentrantLock是Java的一个锁,用于协作多线程,对于读锁来说是一个共享锁,而写锁呢是一个独占锁,支持锁降级(占用写锁同时再获取读锁,但是占用读锁同时不能再获取写锁)

ReentrantReadWriteLock的使用场景

读多于写的场景下保证并发安全同时提高效率

读写缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 缓存对象,这里用jvm缓存
Map<String, String> cache = new HashMap<>();
// 读写锁
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

// 读取操作
public String getData(String key) {
// 加读锁,防止其他线程修改缓存
readWriteLock.readLock().lock();
try {
String value = cache.get(key);
// 如果缓存命中,返回
if(value != null) {
return value;
}
} finally {
// 释放读锁
readWriteLock.readLock().unlock();
}

//如果缓存没有命中,从数据库中加载
readWriteLock.writeLock().lock();
try {
// 细节,为防止重复查询数据库, 再次验证
// 因为get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
String value = cache.get(key);
if(value == null) {
// 这里可以改成从数据库查询
value = "alvin";
cache.put(key, value);
}
return value;
} finally {
readWriteLock.writeLock().unlock();
}
}

ReentrantReadWriteLock源码解析

构造器

默认也是非公平锁

1
2
3
4
5
6
7
8
9
public ReentrantReadWriteLock() {
this(false);
}

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 读锁的偏移量,用来计算持有读锁的线程数
static final int SHARED_SHIFT = 16;

// 读锁高16位,读锁个数加1,其实是状态值加 2^16
static final int SHARED_UNIT = (1 << SHARED_SHIFT);

// 锁最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

// 写锁掩码,用于标记低16位,00...0011...11
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 读锁计数,当前持有读锁的线程数,c的高16位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

// 写锁的计数,也就是它的重入次数,c的低16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 当前线程持有的读锁重入数量
private transient ThreadLocalHoldCounter readHolds;

// 最近一个获取读锁成功的线程计数器
private transient HoldCounter cachedHoldCounter;

// 第一个获取读锁的线程
private transient Thread firstReader = null;

// firstReader的持有数
private transient int firstReaderHoldCount;

// 构造函数
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}

// 持有读锁的线程计数器
static final class HoldCounter {
int count = 0; //持有数
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

// 本地线程计数器,每个线程有自己的计数器,互相隔离
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}

lock()

读锁

调用AQS的acquireShared()

1
2
3
4
5
6
7
8
public void lock() {
sync.acquireShared(1);
}

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

读锁tryAcquireShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
// 写锁被持有并且不是自己持有,返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// r 是当前持有读锁的线程数
int r = sharedCount(c);
// 尝试加读锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 首次获取读锁,初始化firstReader和firstReaderHoldCount
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
// 当前线程是首个获取读锁的线程,持有资源数+1
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 更新cachedHoldCounter
else {
HoldCounter rh = cachedHoldCounter;
// 如果需要初始化或者最近获取到读锁的线程计数器不是本线程,设置最近获取到读锁的线程计数器是本线程的计数器
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 更新获取的读锁数量
rh.count++;
}
// 返回1,获取锁成功
return 1;
}
return fullTryAcquireShared(current);
}

写锁

调用AQS的acquire()

1
2
3
4
5
6
7
8
9
public void lock() {
sync.acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

写锁tryAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
// 持有读锁的线程数或者写锁的重入次数
int c = getState();
// 当前写锁的重入次数
int w = exclusiveCount(c);
// c != 0 说明当前有读锁或者写锁存在
if (c != 0) {
// 当前是读锁或者持有写锁的不是自己,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 重入次数达到上限
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 设置state
setState(c + acquires);
return true;
}
// 到达这里说明当前没有锁,尝试加写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

unlock()

读锁

调用AQS的releaseShared()

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

读锁tryReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 当前为第一个获取读锁的线程
if (firstReader == current) {
// 更新线程持有数
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程的计数器
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 释放资源
for (;;) {
int c = getState();
// 获取剩余资源
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

写锁

调用AQS的release()

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

写锁tryRelease()

这里的操作和ReentrantLock的释放锁很像,都是只考虑独占锁的释放

1
2
3
4
5
6
7
8
9
10
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}