本文基于JDK8

什么是AQS?

AQS(AbstractQueuedSynchronizer)抽象队列同步器,一个用来构建锁和同步工具类的框架,比如构建ReentrantLock、ReentrantReadWriteLock这样的锁或是CountDownLatch、CyclicBarrier、Semaphore这样的同步工具类,底层呢是在锁内部或者同步工具类内部自定义同步器Sync,让Sync实现AQS的一些方法完成对资源state的控制效果,从而实现同步。

AQS有两种模式

  • 独占:只有一个线程可以获取到资源
  • 共享:很多线程都可以获取到资源

独占和共享是对于资源来说的,即一个资源能否被多个线程所占有,而公平和非公平是对于线程来说的,即一个新的线程是否需要排队才能获取资源。

AQS的几个重要的成员变量

  • Node:同步队列的节点
    • head、tail:用来维护一个虚拟的双向链表(同步队列,在线程尝试获取资源失败后,会进入同步队列队尾,给前继节点设置一个唤醒信号后,自身进入等待状态(通过LockSupport.park(this)),直到被前继节点唤醒)
    • state:用来表示资源的情况
    • waitStatus:当前节点的状态
    • nextWaiter:当前资源是独占还是共享
  • ConditionObject:条件队列的节点
    • firstWaiter、lastWaiter:用来维护一个虚拟的双向链表(条件队列)

每个节点的状态waitStatus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final Node SHARED = new Node();

static final Node EXCLUSIVE = null;

// 说明当前节点发生异常
static final int CANCELLED = 1;

// 说明当前节点是清醒的,后续节点需要依赖本节点唤醒
static final int SIGNAL = -1;

// 说明当前节点在条件队列中
static final int CONDITION = -2;

// 暂时理解为共享模式下的传播唤醒
static final int PROPAGATE = -3;

独占模式下对state的获取

image-20240308202115837

你会看到AQS大量使用了CAS操作来保证并发安全,例如修改state的值或者说是将新节点尾插到队列等。

基础同步器

那么接下来看看AQS两种模式下如何实现加锁和解锁的,本文锁和资源的意思相同。

  • acquire(int)独占模式下获取资源
  • release(int)独占模式下释放资源
  • acquireShared(int)共享模式下获取资源
  • releaseShared(int)共享模式下释放资源

acquire

尝试获取锁,获取锁失败加入同步队列,看看要不要阻塞(等待前节点唤醒)

对于ReentrantLock,不论是公平锁或者非公平锁,底层都是AQS的acquire()方法。

1
2
3
4
5
6
7
public final void acquire(int arg) {
// 尝试加锁
if (!tryAcquire(arg) &&
// 获取锁失败,把当前线程加入同步队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire:尝试获取锁

模板方法,交给子类的同步器实现

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

在ReentrantLock实现如下

公平锁:返回true说明加锁成功,否则说明加锁失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前state的值
int c = getState();
// 如果c == 0,说明当前没人占有锁
if (c == 0) {
// 如果队列没有节点,说明没人在排队等待获取资源,则通过CAS尝试修改state,如果修改成功则设置当前线程为资源占有者
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前锁已经被占有并且为自己占有的话,则增加state,这也是可重入锁的原理
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 加锁失败
return false;
}

非公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前state的值
int c = getState();
// 如果c == 0,说明当前没人占有锁
if (c == 0) {
// 通过CAS尝试修改state,如果修改成功则设置当前线程为资源占有者
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前锁已经被占有并且为自己占有的话,则增加state,这也是可重入锁的原理
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 返回false表示获取锁失败
return false;
}

hasQueuedPredecessors:判断同步队列有没有节点

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {

Node t = tail;
Node h = head;
Node s;
// 什么时候返回true:至少有一个节点,并且头节点的下一个节点不是当前线程
// 什么时候返回false:队列没有节点,head == tail == null
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

addWaiter:前面tryAcquire获取锁失败把当前线程加入同步队列,并设置为独占模式(Node.EXCLUSIVE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addWaiter(Node mode) {
// 先包装当前线程到一个新节点
Node node = new Node(Thread.currentThread(), mode);
// 先尝试一遍把当前节点加入同步队列
Node pred = tail;
// 如果同步队列不为空,使用尾插法插入
if (pred != null) {
node.prev = pred;
// 通过CAS把当前节点设置为尾节点,CAS是为了并发安全
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尝试第二次把当前节点加入同步队列(里面是一个死循环,多线程可能导致添加失败会自旋直到添加成功)
enq(node);
return node;
}

enq:将节点加入同步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
// 死循环直到加入成功
for (;;) {
Node t = tail;
// 队列为空,进行初始化,先创建一个头节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾插法
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued:真正入队(节点会被阻塞),入队之前如果是头节点的后继节点会再次尝试获取锁,addWaiter只不过是创建了一个节点结构放入队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 表示当前线程是否被打断
boolean interrupted = false;
for (;;) {
// p是当前线程的前驱节点
final Node p = node.predecessor();
// p是头节点的话尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取锁失败,检查是否可以park并且调用LockSupport.park()阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果出现异常,取消正在等待的节点操作,并且找到一个有效的前边节点指向有效的后边节点
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire:获取锁失败或者前驱节点不是头节点,判断是否应该在同步队列阻塞,返回true说明可以阻塞,否则需要再次尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前面的节点是SIGNAL状态,当前节点可以阻塞
if (ws == Node.SIGNAL)
return true;
// 前面节点是CANCEL状态,要遍历找到一个SIGNAL的前节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 把前节点设置为SIGNAL状态,一般在节点加入同步队列后会到这里,因为需要前驱节点唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false,在调用者方法再次尝试获取资源
return false;
}

parkAndCheckInterrupt:调用LockSupport.park阻塞当前节点

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

cancelAcquire:遇到异常时要把当前节点移除出队列,同时维护队列节点的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// 往前遍历找到状态为SIGNAL的前节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 前节点的下一个节点
Node predNext = pred.next;

// 设置当前节点状态为CANCELLED
node.waitStatus = Node.CANCELLED;

// 如果我们是队尾节点,CAS移除自己
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 前节点不是头节点并且状态为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 将前节点的后继节点设置为当前节点的下一个节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 前节点为head节点,帮忙唤醒当前节点的后继节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

release

对于ReentrantLock,不论是公平锁或者非公平锁,底层都是AQS的release()方法。

1
2
3
4
5
6
7
8
9
10
11
12
public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
Node h = head;
// h的状态是SIGNAL
if (h != null && h.waitStatus != 0)
// 唤醒头节点的下一个节点
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease:尝试释放锁

模板方法,交给子类的同步器实现

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

在ReentrantLock实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final boolean tryRelease(int releases) {
// 剩余的资源量
int c = getState() - releases;
// 防止其它线程释放资源占有者线程的锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c == 0 才说明当前线程要释放资源,因为有可重入锁机制
if (c == 0) {
// 资源释放成功
free = true;
// 设置当前资源没有人占领
setExclusiveOwnerThread(null);
}
// 设置state
setState(c);
return free;
}

unparkSuccessor:唤醒下一个有效节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void unparkSuccessor(Node node) {
// 当前节点状态
int ws = node.waitStatus;
// 将节点状态SIGNAL转化为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 如果当前线程的下一个节点是空的或者CANCELLED状态,则从后往前找有效的下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找有效的下一个节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 调用LockSupport.unpark唤醒下一个有效的节点
if (s != null)
LockSupport.unpark(s.thread);
}

acquireShared

1
2
3
4
5
6
7
8
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试加锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared:尝试加锁

模板方法,交给子类的同步器实现

1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

返回值:

  • 正数:成功获取资源,并且还有剩余可用资源,可以唤醒下一个等待线程;
  • 负数:获取资源失败,准备进入同步队列;
  • 0:获取资源成功,但没有剩余可用资源。

doAcquireSharedInterruptibly:线程判断自己如果是头节点的下一个节点,尝试获取锁,并且根据需要唤醒之后的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将当前节点加入同步队列末尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前驱节点是头节点
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
// r >= 0 表示还有资源,需要唤醒后续其它的节点
if (r >= 0) {
// 唤醒后续其它的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 获取锁失败,检查是否可以park并且调用LockSupport.park()阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 如果获取锁失败,取消正在等待的节点操作,并且找到一个有效的前边节点指向有效的后边节点
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate:唤醒后续的节点

1
2
3
4
5
6
7
8
9
10
11
12
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 更新头节点
setHead(node);
// 有剩余资源,或者当前节点有唤醒信号
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

releaseShared

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 尝试释放当前节点占有的资源
if (tryReleaseShared(arg)) {
// 继续释放后续节点的资源
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared:尝试释放锁,只有state == 0才返回true

1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

doReleaseShared:继续释放后续节点的资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doReleaseShared() {
// 自旋,防止操作时有新的节点加入导致CAS失败
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 节点设置为0之后,唤醒之后的节点
unparkSuccessor(h);
}
// waitStatus为0,说明是从setHeadAndPropagate过来的操作,CAS修改为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

条件同步器

await:尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到条件队列
Node node = addConditionWaiter();
// 释放同步队列的锁,lock.lock()的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当前节点是否在同步队列中
while (!isOnSyncQueue(node)) {
// 当前线程不在同步队列,调用LockSupport.park()阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 到这里说明节点被signal()唤醒,需要从条件队列移动到同步队列中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 解除条件队列中已经取消的等待节点的链接
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

isOnSyncQueue:

1
2
3
4
5
6
7
8
9
10
final boolean isOnSyncQueue(Node node) {
// 当前节点是CONDITION状态或者没有前驱节点,说明不在同步队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果有后继节点,一定在同步队列
if (node.next != null)
return true;
// 从同步队列最后往前遍历看看是否有当前节点
return findNodeFromTail(node);
}

signal:尝试唤醒其他节点

1
2
3
4
5
6
7
8
public final void signal() {
// 判断当前线程是否已经持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal():

1
2
3
4
5
6
7
8
9
10
11
12
private void doSignal(Node first) {
do {
// 更新头节点
if ((firstWaiter = first.nextWaiter) == null)
// 没有节点了
lastWaiter = null;
// 解除头节点的连接
first.nextWaiter = null;
// transferForSignal将头节点转移到同步队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

transferForSignal:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

// 将当前线程加入同步队列,返回节点的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱节点是CANCELED状态或者设置前继节点的状态失败,调用LockSupport.unpark直接唤醒当前节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}