本文基于JDK8
什么是AQS? AQS(AbstractQueuedSynchronizer)抽象队列同步器,一个用来构建锁和同步工具类的框架 ,比如构建ReentrantLock、ReentrantReadWriteLock这样的锁或是CountDownLatch、CyclicBarrier、Semaphore这样的同步工具类,底层呢是在锁内部或者同步工具类内部自定义同步器Sync ,让Sync实现AQS的一些方法完成对资源state的控制效果,从而实现同步。
AQS有两种模式
独占:只有一个线程可以获取到资源
共享:很多线程都可以获取到资源
独占和共享是对于资源来说的,即一个资源能否被多个线程所占有,而公平和非公平是对于线程来说的,即一个新的线程是否需要排队才能获取资源。
AQS的几个重要的成员变量 :
Node:同步队列的节点
head、tail:用来维护一个虚拟的双向链表(同步队列,在线程尝试获取资源失败后,会进入同步队列队尾,给前继节点设置一个唤醒信号后,自身进入等待状态(通过LockSupport.park(this)
),直到被前继节点唤醒)
state:用来表示资源的情况
waitStatus:当前节点的状态
nextWaiter:当前资源是独占还是共享
ConditionObject:条件队列的节点
firstWaiter、lastWaiter:用来维护一个虚拟的双向链表(条件队列)
每个节点的状态waitStatus
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static final Node SHARED = new Node ();static final Node EXCLUSIVE = null ;static final int CANCELLED = 1 ;static final int SIGNAL = -1 ;static final int CONDITION = -2 ;static final int PROPAGATE = -3 ;
独占模式下对state的获取
你会看到AQS大量使用了CAS操作来保证并发安全,例如修改state的值或者说是将新节点尾插到队列等。
基础同步器 那么接下来看看AQS两种模式下如何实现加锁和解锁的,本文锁和资源的意思相同。
acquire(int)
:独占模式 下获取资源
release(int)
:独占模式 下释放资源
acquireShared(int)
:共享模式 下获取资源
releaseShared(int)
:共享模式 下释放资源
acquire 尝试获取锁,获取锁失败加入同步队列,看看要不要阻塞(等待前节点唤醒)
对于ReentrantLock,不论是公平锁或者非公平锁,底层都是AQS的acquire()方法。
1 2 3 4 5 6 7 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire:尝试获取锁 模板方法,交给子类的同步器实现
1 2 3 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException (); }
在ReentrantLock实现如下
公平锁:返回true说明加锁成功,否则说明加锁失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
非公平锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
hasQueuedPredecessors:判断同步队列有没有节点 1 2 3 4 5 6 7 8 9 10 public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
addWaiter:前面tryAcquire获取锁失败把当前线程加入同步队列,并设置为独占模式(Node.EXCLUSIVE) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
enq:将节点加入同步队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued:真正入队(节点会被阻塞),入队之前如果是头节点的后继节点会再次尝试获取锁,addWaiter只不过是创建了一个节点结构放入队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire:获取锁失败或者前驱节点不是头节点,判断是否应该在同步队列阻塞,返回true说明可以阻塞,否则需要再次尝试获取锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
parkAndCheckInterrupt:调用LockSupport.park阻塞当前节点 1 2 3 4 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
cancelAcquire:遇到异常时要把当前节点移除出队列,同时维护队列节点的状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
release 对于ReentrantLock,不论是公平锁或者非公平锁,底层都是AQS的release()方法。
1 2 3 4 5 6 7 8 9 10 11 12 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
tryRelease:尝试释放锁 模板方法,交给子类的同步器实现
1 2 3 protected boolean tryRelease (int arg) { throw new UnsupportedOperationException (); }
在ReentrantLock实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
unparkSuccessor:唤醒下一个有效节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
acquireShared 1 2 3 4 5 6 7 8 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
tryAcquireShared:尝试加锁 模板方法,交给子类的同步器实现
1 2 3 protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException (); }
返回值:
正数:成功获取资源,并且还有剩余可用资源,可以唤醒下一个等待线程;
负数:获取资源失败,准备进入同步队列;
0:获取资源成功,但没有剩余可用资源。
doAcquireSharedInterruptibly:线程判断自己如果是头节点的下一个节点,尝试获取锁,并且根据需要唤醒之后的节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate:唤醒后续的节点 1 2 3 4 5 6 7 8 9 10 11 12 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
releaseShared 1 2 3 4 5 6 7 8 9 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
tryReleaseShared:尝试释放锁,只有state == 0才返回true 1 2 3 protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException (); }
doReleaseShared:继续释放后续节点的资源 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
条件同步器 await:尝试获取锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
isOnSyncQueue: 1 2 3 4 5 6 7 8 9 10 final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); }
signal:尝试唤醒其他节点 1 2 3 4 5 6 7 8 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); }
doSignal(): 1 2 3 4 5 6 7 8 9 10 11 12 private void doSignal (Node first) { do { if ((firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }
transferForSignal: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }