阻塞队列 阻塞队列用到了类似于生产者-消费者模式 ,和普通队列不同,在队列为空的时候消费动作会被阻塞,在队列满的时候生产动作会被阻塞。
ArrayBlockingQueue 成员变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 final Object[] items;int takeIndex;int putIndex;int count;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;transient Itrs itrs = null ;
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
enqueue(e) 1 2 3 4 5 6 7 8 9 10 11 private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
take() 整体流程和之前的put()差不多,只不过这里的while循环的条件是队列为空
1 2 3 4 5 6 7 8 9 10 11 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
dequeue() 整体流程和之前的enqueue()差不多
1 2 3 4 5 6 7 8 9 10 11 12 13 private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
LinkedBlockingDeque 成员变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static final class Node <E> { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } } transient Node<E> first;transient Node<E> last;private transient int count;private final int capacity;final ReentrantLock lock = new ReentrantLock ();private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 public void putLast (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); Node<E> node = new Node <E>(e); final ReentrantLock lock = this .lock; lock.lock(); try { while (!linkLast(node)) notFull.await(); } finally { lock.unlock(); } }
linkLast() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private boolean linkLast (Node<E> node) { if (count >= capacity) return false ; Node<E> l = last; node.prev = l; last = node; if (first == null ) first = node; else l.next = node; ++count; notEmpty.signal(); return true ; }
take() 1 2 3 4 5 6 7 8 9 10 11 12 public E takeFirst () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null ) notEmpty.await(); return x; } finally { lock.unlock(); } }
unlinkFirst() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private E unlinkFirst () { Node<E> f = first; if (f == null ) return null ; Node<E> n = f.next; E item = f.item; f.item = null ; f.next = f; first = n; if (n == null ) last = null ; else n.prev = null ; --count; notFull.signal(); return item; }
写时复制容器 什么是写时复制? 写时复制是一种懒加载的设计思想,例如linux创建子进程,并不会拷贝父进程的数据,而是先用一个指针引用指向共享内存,等到父进程需要修改的时候,才会给子进程复制一份冲突的内存。
CopyOnWriteArrayList 成员变量 1 2 3 4 final transient ReentrantLock lock = new ReentrantLock ();private transient volatile Object[] array;
add() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
get() 没有加锁
1 2 3 private E get (Object[] a, int index) { return (E) a[index]; }
set() 和add()类似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public E set (int index, E element) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); E oldValue = get(elements, index); if (oldValue != element) { int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len); newElements[index] = element; setArray(newElements); } else { setArray(elements); } return oldValue; } finally { lock.unlock(); } }
remove() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public E remove (int index) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1 ; if (numMoved == 0 ) setArray(Arrays.copyOf(elements, len - 1 )); else { Object[] newElements = new Object [len - 1 ]; System.arraycopy(elements, 0 , newElements, 0 , index); System.arraycopy(elements, index + 1 , newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } }
CopyOnWriteArraySet 成员变量 底层使用
1 private final CopyOnWriteArrayList<E> al;
add() 和CopyOnWriteArrayList相同
remove() 和CopyOnWriteArrayList相同
contains() 1 2 3 4 public boolean contains (Object o) { Object[] elements = getArray(); return indexOf(o, elements, 0 , elements.length) >= 0 ; }
indexof() 遍历CopyOnWriteArrayList
1 2 3 4 5 6 7 8 9 10 11 12 13 private static int indexOf (Object o, Object[] elements, int index, int fence) { if (o == null ) { for (int i = index; i < fence; i++) if (elements[i] == null ) return i; } else { for (int i = index; i < fence; i++) if (o.equals(elements[i])) return i; } return -1 ; }
分段锁容器 ConcurrentHashMap 略