什么是ArrayBlockingQueue
ArrayBlockingQueue底层是由数组实现的定长阻塞队列(阻塞表示如果没有原始那么获取元素会阻塞当前线程)
ArrayBlockingQueue用来干嘛
ArrayBlockingQueue一般用于生产者消费者模型业务(排队机制,先进先出)
源码解析
数据的存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items 存储元素容器*/ final Object[] items; /** items index for next take, poll, peek or remove 使用过的元素 */ int takeIndex; /** items index for next put, offer, or add 添加过的元素 */ int putIndex; /** Number of elements in the queue 当前元素数量 */ int count;
|
数据的操作
add
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public boolean add(E e) { return super.add(e); } super.add public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { checkNotNull(e);//ArrayBlockingQueue不能存储null对象 final ReentrantLock lock = this.lock;//插入操作线程安全 lock.lock(); try { if (count == items.length)//如果当前count==items.length表示队列已经忙了,不能插入 return false; else { insert(e);//插入元素 return true; } } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x;//第一次put为0 putIndex = inc(putIndex);//递增 ++count;//数量递增 notEmpty.signal();//通知获取原始方法可以进行获取 } final int inc(int i) {//如果当前putIndex==items.length那么putIndex重新从零开始 return (++i == items.length) ? 0 : i; } //同样为添加元素,lock.lockInterruptibly如果检测到有Thread.interrupted();会直接抛出异常 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
|
remove
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) {//从头部开始遍历元素判断 removeAt(i); return true; } } return false; } finally { lock.unlock(); } } //queue size = 10 putSize = 5 tackSize = 0 //queue 1,2,3,4,5 removeAt 3 step1: removeAt != takeIndex i = nexti = 4 void removeAt(int i) { final Object[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null;//引用设置为空 takeIndex = inc(takeIndex);//takeIndex++ } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i);//>队列的头部 递增(putIndex一个循环的0-n) if (nexti != putIndex) {//递增后部位putIndex全部向前移动位置 items[i] = items[nexti]; i = nexti; } else { items[i] = null;//元素设置为空 putIndex = i; break; } } } --count;//元素递减 notFull.signal();//通知notFull.awit() }
|
update
public E poll() {//获取队列头部元素,获取后设置为空
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();//如果当前队列为空直接返回null,不为空调用extract()
} finally {
lock.unlock();
}
}
//获取队列头部元素,获取后设置为空
//take获取原始如果队列为空会进入阻塞状态知道等到有添加元素才会去返回
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//lock.lockInterruptibly如果检测到有Thread.interrupted();会直接抛出异常
try {
while (count == 0)
notEmpty.await();//如果没有元素进入等待状态,等待被唤醒
return extract();
} finally {
lock.unlock();
}
}
//peek查看队列头部元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : itemAt(takeIndex);//如果元素为空直接返回null,不为空条用itemAt(takeIndex)
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.cast(items[takeIndex]);//泛型转换并且获得当前元素
items[takeIndex] = null;//当前元素设置为空
takeIndex = inc(takeIndex);//获取原始递增
–count;//队列元素递减
notFull.signal();//通知notFull.await()可以进行插入元素
return x;//返回当前获取原始
}
//获取元素
final E itemAt(int i) {
return this.cast(items[i]);
}
```
什么时候扩容
定长队列,不能进行扩容
是否线程安全
线程安全
使用注意事项
- ArrayBlockingQueue为定长队列
- ArrayBlockingQueue的添加和获取方法都有提供阻塞和非阻塞的根据需要使用
引用