什么是ReentrantLock?

ReentrantLock是Java的一个锁,用于协作多线程,同时也是一个独占锁

ReentrantLock应用场景

保证并发安全

代替synchronized实现同步

线程通信

如线程交替打印1~100

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class JUCTest {

private static volatile int count = 0;
private static final int MAX = 100;

private static final ReentrantLock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();

public static void main(String[] args) {
Thread t1 = new Thread(new Print(), "thread-1");
Thread t2 = new Thread(new Print(), "thread-2");
t1.start();
t2.start();
}

static class Print implements Runnable {

@Override
public void run() {
while (count <= MAX) {
lock.lock();
try {
if (count <= MAX) {
System.out.println(Thread.currentThread().getName() + ": " + count);
count++;
}
condition.signalAll();
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
}
}

ReentrantLock源码解析

构造器

默认是非公平锁

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

lock()

对于lock()方法,先是在ReentrantLock内部实现了自己的非公平以及公平的同步器,可以自定义tryAcquire()方法的实现,这是lock()方法获取锁的一个重要步骤,同时对于没有获取到锁的线程,我们需要有一个数据结构保存起来,让他们能够等待时机获取锁,这也就是同步队列。

重要的三个步骤:

  1. tryAcquire:每个新的线程都会尝试获取锁
  2. addWaiter:获取锁成功可以执行自己的任务,获取锁失败把自己包装成节点加入同步队列(同步队列是有一个虚拟头节点的,用来唤醒下一个抢锁的线程)
  3. acquireQueued:把刚刚加入同步队列内的线程阻塞
1
2
3
public void lock() {
sync.lock();
}

公平锁

1
2
3
4
5
6
7
8
9
final void lock() {
acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

公平锁tryAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前资源没人占用才能尝试获取锁
if (c == 0) {
// 先看看有没有前面的节点,如果没有其他节点再CAS尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁原理
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

非公平锁

只不过多一个先CAS抢锁的判断

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

非公平锁tryAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前资源没人占用才能尝试获取锁
if (c == 0) {
// CAS尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁原理
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

unlock()

对于unlock()方法,实现比较简单,不存在考虑竞争等问题,只需要把自己的资源释放并且唤醒排队的第一个节点就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

await()

对于await()方法,功能类似于synchronized的Object.wait(),不过synchronized只能用一个Object对象,而ReentrantLock可以有很多个Condition,await()是把持有锁的线程转移到条件队列,可以实现线程间的通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 把线程包装成节点加入条件队列,设置waitstatus == CONDITION
Node node = addConditionWaiter();
// 释放线程持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断线程是否在同步队列
while (!isOnSyncQueue(node)) {
// 到这里说明当前线程在条件队列,可以阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

addConditionWaiter()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

fullyRelease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 释放线程持有的锁,并唤醒后续节点
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

signal()

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal()

用enq()方法把线程放到同步队列。

1
2
3
4
5
6
7
8
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

transferForSignal()

1
2
3
4
5
6
7
8
9
10
11
12
13
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 把当前节点加入同步队列,返回前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

signalAll()

比signal()多一个循环

1
2
3
4
5
6
7
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

doSignalAll()

1
2
3
4
5
6
7
8
9
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

总结

ReentrantLock是基于AQS实现的,有几个重要的属性:资源/锁state、Node节点、同步队列的头尾节点head和tail,条件队列的头为节点headWaiter和tailWaiter,比较难以理解的就是lock()和await(),对于release()和signal()的理解会比较容易一些。

ReentrantLock和synchronized的结构很像,都由一个资源/监视器、一个同步队列、一个条件队列组成,对于线程的控制方法在抽象上是大差不差的,只不过实际上实现有所区别。