什么是ReentrantLock?
ReentrantLock是Java的一个锁,用于协作多线程,同时也是一个独占锁
ReentrantLock应用场景
保证并发安全
代替synchronized实现同步
线程通信
如线程交替打印1~100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class JUCTest {
private static volatile int count = 0; private static final int MAX = 100; private static final ReentrantLock lock = new ReentrantLock(); private static final Condition condition = lock.newCondition();
public static void main(String[] args) { Thread t1 = new Thread(new Print(), "thread-1"); Thread t2 = new Thread(new Print(), "thread-2"); t1.start(); t2.start(); }
static class Print implements Runnable {
@Override public void run() { while (count <= MAX) { lock.lock(); try { if (count <= MAX) { System.out.println(Thread.currentThread().getName() + ": " + count); count++; } condition.signalAll(); condition.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } }
} } }
|
ReentrantLock源码解析
构造器
默认是非公平锁
1 2 3 4 5 6 7
| public ReentrantLock() { sync = new NonfairSync(); }
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
|
lock()
对于lock()方法,先是在ReentrantLock内部实现了自己的非公平以及公平的同步器,可以自定义tryAcquire()方法的实现,这是lock()方法获取锁的一个重要步骤,同时对于没有获取到锁的线程,我们需要有一个数据结构保存起来,让他们能够等待时机获取锁,这也就是同步队列。
重要的三个步骤:
- tryAcquire:每个新的线程都会尝试获取锁
- addWaiter:获取锁成功可以执行自己的任务,获取锁失败把自己包装成节点加入同步队列(同步队列是有一个虚拟头节点的,用来唤醒下一个抢锁的线程)
- acquireQueued:把刚刚加入同步队列内的线程阻塞
1 2 3
| public void lock() { sync.lock(); }
|
公平锁
1 2 3 4 5 6 7 8 9
| final void lock() { acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
|
公平锁tryAcquire()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
非公平锁
只不过多一个先CAS抢锁的判断
1 2 3 4 5 6
| final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
|
非公平锁tryAcquire()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
unlock()
对于unlock()方法,实现比较简单,不存在考虑竞争等问题,只需要把自己的资源释放并且唤醒排队的第一个节点就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
await()
对于await()方法,功能类似于synchronized的Object.wait(),不过synchronized只能用一个Object对象,而ReentrantLock可以有很多个Condition,await()是把持有锁的线程转移到条件队列,可以实现线程间的通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
addConditionWaiter()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
fullyRelease()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
|
signal()
1 2 3 4 5 6 7
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
|
doSignal()
用enq()方法把线程放到同步队列。
1 2 3 4 5 6 7 8
| private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
|
transferForSignal()
1 2 3 4 5 6 7 8 9 10 11 12 13
| final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
signalAll()
比signal()多一个循环
1 2 3 4 5 6 7
| public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
|
doSignalAll()
1 2 3 4 5 6 7 8 9
| private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
|
总结
ReentrantLock是基于AQS实现的,有几个重要的属性:资源/锁state、Node节点、同步队列的头尾节点head和tail,条件队列的头为节点headWaiter和tailWaiter,比较难以理解的就是lock()和await(),对于release()和signal()的理解会比较容易一些。
ReentrantLock和synchronized的结构很像,都由一个资源/监视器、一个同步队列、一个条件队列组成,对于线程的控制方法在抽象上是大差不差的,只不过实际上实现有所区别。