不满足就阻塞等待
ArrayBlockingQueue
基于数组
ArrayBlockingQueue.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items;
int takeIndex; int putIndex;
int count;
final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
}
|
构造
ArrayBlockingQueue.java1 2 3 4 5 6 7 8 9 10 11 12 13
| public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
|
插入数据
ArrayBlockingQueue.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
|
取出数据
ArrayBlockingQueue.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
LinkedBlockingQueue 和 LinkedBlockingDeque
单链表采取双锁结构,每个锁关联一个条件,这样插入和取出可以不冲突,吞吐量增强,代价是插入的数据不一定能及时被取出数据线程可见
LinkedBlockingQueue.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private final int capacity; private final AtomicInteger count = new AtomicInteger();
transient Node<E> head; private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); }
|
提供完全加解锁方法,供一些必须保证可见性的方法使用,比如remove
LinkedBlockingQueue.java1 2 3 4 5 6 7 8
| void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
|
基于链表实现的队列实际也是有界的,不强制指定容量,默认是整型最大值
LinkedBlockingQueue.java1 2 3 4 5 6 7 8
| public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
|
双向链表实现版本和数组类似,采用一锁加两条件方式
LinkedBlockingDeque.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { transient Node<E> first; transient Node<E> last;
private transient int count; private final int capacity;
final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); }
|