阻塞队列

阻塞队列用到了类似于生产者-消费者模式,和普通队列不同,在队列为空的时候消费动作会被阻塞,在队列满的时候生产动作会被阻塞。

ArrayBlockingQueue

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 底层存储元素的数组
final Object[] items;

// 在索引处取出元素
int takeIndex;

// 在索引处放入元素
int putIndex;

// 当前数组内的元素个数
int count;

// 用ReentrantLock实现并发安全
final ReentrantLock lock;

// 用Condition notEmpty实现消费者阻塞等待
private final Condition notEmpty;

// 用Condition notFull实现生产者阻塞等待
private final Condition notFull;

// 迭代器
transient Itrs itrs = null;

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 使用可打断加锁
lock.lockInterruptibly();
try {
// 阻塞队列特性,队列满生产阻塞,放到同步队列
while (count == items.length)
notFull.await();
// 到这里有两种可能,一个是可以正常放入新元素;另外就是上面的while阻塞等待条件被破坏,有任务被消费了
// 加入队列
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}

enqueue(e)

1
2
3
4
5
6
7
8
9
10
11
private void enqueue(E x) {
final Object[] items = this.items;
// 放入元素
items[putIndex] = x;
// 为了避免指针超界限
if (++putIndex == items.length)
putIndex = 0;
count++;
// 可以让同步队列内的消费者被唤醒
notEmpty.signal();
}

take()

整体流程和之前的put()差不多,只不过这里的while循环的条件是队列为空

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

dequeue()

整体流程和之前的enqueue()差不多

1
2
3
4
5
6
7
8
9
10
11
12
13
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

LinkedBlockingDeque

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class Node<E> {

E item;
Node<E> prev;
Node<E> next;

Node(E x) {
item = x;
}
}

// 链表的头节点
transient Node<E> first;

// 链表的尾节点
transient Node<E> last;

// 链表内的元素数量
private transient int count;

// 链表最大可装载的节点数量
private final int capacity;

// 用ReentrantLock实现并发安全
final ReentrantLock lock = new ReentrantLock();

// 用Condition notEmpty实现消费者阻塞等待
private final Condition notEmpty = lock.newCondition();

// 用Condition notFull实现生产者阻塞等待
private final Condition notFull = lock.newCondition();

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 尾插法加入新节点
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}

linkLast()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private boolean linkLast(Node<E> node) {
// 如果链表节点数到达最大值,返回false,把生产者加入同步队列阻塞
if (count >= capacity)
return false;
// 尾插法插入新元素
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
// 唤醒消费者同步队列线程
notEmpty.signal();
return true;
}

take()

1
2
3
4
5
6
7
8
9
10
11
12
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

unlinkFirst()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private E unlinkFirst() {
// 移除头部节点
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
// 唤醒生产者同步队列的线程
notFull.signal();
return item;
}

写时复制容器

什么是写时复制?

写时复制是一种懒加载的设计思想,例如linux创建子进程,并不会拷贝父进程的数据,而是先用一个指针引用指向共享内存,等到父进程需要修改的时候,才会给子进程复制一份冲突的内存。

CopyOnWriteArrayList

成员变量

1
2
3
4
final transient ReentrantLock lock = new ReentrantLock();

// 存放元素的数组
private transient volatile Object[] array;

add()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean add(E e) {
// 上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 创建新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

get()

没有加锁

1
2
3
private E get(Object[] a, int index) {
return (E) a[index];
}

set()

和add()类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);

if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}

remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
// 这里专门把index位置漏掉
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}

CopyOnWriteArraySet

成员变量

底层使用

1
private final CopyOnWriteArrayList<E> al;

add()

和CopyOnWriteArrayList相同

remove()

和CopyOnWriteArrayList相同

contains()

1
2
3
4
public boolean contains(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length) >= 0;
}

indexof()

遍历CopyOnWriteArrayList

1
2
3
4
5
6
7
8
9
10
11
12
13
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
if (o == null) {
for (int i = index; i < fence; i++)
if (elements[i] == null)
return i;
} else {
for (int i = index; i < fence; i++)
if (o.equals(elements[i]))
return i;
}
return -1;
}

分段锁容器

ConcurrentHashMap