什么是Semaphore?

Semaphore是Java的一个同步类,用于协作多线程,同时也是一个共享锁

Semaphore使用场景

限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) throws IOException {
// 限制每次最多只有两个线程执行任务
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new MyThread(semaphore).start();
}
System.in.read();
}

static class MyThread extends Thread{

Semaphore semaphore;

public MyThread(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void run() {
try {
// 每个工作线程获取一个信号量
semaphore.acquire();
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000 + new Random().nextInt(1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 结束任务释放信号量
semaphore.release();
}
}
}

独占锁

其实就是把初始信号量改成1,用共享锁的模式实现独占锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws IOException {
Semaphore semaphore = new Semaphore(1);
for (int i = 0; i < 5; i++) {
new MyThread(semaphore).start();
}
System.in.read();
}

static class MyThread extends Thread{

Semaphore semaphore;

public MyThread(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void run() {
try {
// 获取到锁
semaphore.acquire();
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000 + new Random().nextInt(1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
semaphore.release();
}
}
}

Semaphore源码解析

构造器

默认的同步器是非公平的NonfairSync

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire()

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly()

1
2
3
4
5
6
7
8
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,判断为true,说明没有资源,这个节点需要阻塞
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared():尝试获取锁

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断同步队列内有节点,说明本节点需要加入队列,返回-1
if (hasQueuedPredecessors())
return -1;
int available = getState();
// 这里acquire可以不是1
int remaining = available - acquires;
// 如果剩下的信号量 < 0 或者获取锁成功,返回剩余的信号量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 来了就抢,不用判断同步队列内是否有节点
int available = getState();
int remaining = available - acquires;
// 如果剩下的信号量 < 0 或者获取锁成功,返回剩余的信号量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

doAcquireSharedInterruptibly():把节点加入队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 把节点加入队列,并且设置节点waitstate == SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 返回当前节点的前驱节点
final Node p = node.predecessor();
// 如果当前节点的前驱节点是头节点
if (p == head) {
// 尝试获取锁,返回值r的意思是剩余的信号量
int r = tryAcquireShared(arg);
// 还有剩余的信号量,把当前节点设置为新的头节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 检查当前节点是否可以阻塞,如果前驱节点的waitstate == SIGNAL才会parkAndCheckInterrupt()阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞当前节点的线程
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 上述for循环内的步骤出现了异常,把当前节点移除出同步队列
if (failed)
cancelAcquire(node);
}
}

release()

1
2
3
public void release() {
sync.releaseShared(1);
}

releaseShared()

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
// 尝试释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared():尝试释放锁

1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS将信号量释放
if (compareAndSetState(current, next))
return true;
}
}

doReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doReleaseShared() {

for (;;) {
Node h = head;
// 队列不空
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点的waitStatus == SIGNAL
if (ws == Node.SIGNAL) {
// 设置头节点的waitStatus == 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 设置成功才释放后继节点
unparkSuccessor(h);
}
// 当头节点waitstatus == 0,并且设置头节点waitstatus == PROPAGATE也可以返回
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

总结

Semaphore在实现了类似于操作系统进程间通信的信号量,一开始有固定的信号量,acquire就是P操作(减少信号量),release就是V操作(返还信号量),只有获取到信号量(不论多少)才能够执行自己的任务。