什么是Semaphore?
Semaphore是Java的一个同步类,用于协作多线程,同时也是一个共享锁
Semaphore使用场景
限流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public static void main(String[] args) throws IOException { Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 5; i++) { new MyThread(semaphore).start(); } System.in.read(); }
static class MyThread extends Thread{
Semaphore semaphore;
public MyThread(Semaphore semaphore) { this.semaphore = semaphore; }
@Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()); Thread.sleep(1000 + new Random().nextInt(1000)); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { semaphore.release(); } } }
|
独占锁
其实就是把初始信号量改成1,用共享锁的模式实现独占锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public static void main(String[] args) throws IOException { Semaphore semaphore = new Semaphore(1); for (int i = 0; i < 5; i++) { new MyThread(semaphore).start(); } System.in.read(); }
static class MyThread extends Thread{
Semaphore semaphore;
public MyThread(Semaphore semaphore) { this.semaphore = semaphore; }
@Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()); Thread.sleep(1000 + new Random().nextInt(1000)); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { semaphore.release(); } } }
|
Semaphore源码解析
构造器
默认的同步器是非公平的NonfairSync
1 2 3 4 5 6 7
| public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
|
acquire()
1 2 3
| public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
|
acquireSharedInterruptibly()
1 2 3 4 5 6 7 8
| public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
|
tryAcquireShared():尝试获取锁
公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
|
非公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
|
doAcquireSharedInterruptibly():把节点加入队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
release()
1 2 3
| public void release() { sync.releaseShared(1); }
|
releaseShared()
1 2 3 4 5 6 7 8
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
tryReleaseShared():尝试释放锁
1 2 3 4 5 6 7 8 9 10 11
| protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
|
doReleaseShared()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| private void doReleaseShared() {
for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
总结
Semaphore在实现了类似于操作系统进程间通信的信号量,一开始有固定的信号量,acquire就是P操作(减少信号量),release就是V操作(返还信号量),只有获取到信号量(不论多少)才能够执行自己的任务。