LinkedBlockingQueue源码解析

什么是LinkedBlockingQueue

LinkedBlockingQueue底层是由节点链表实现的定长阻塞队列(阻塞表示如果没有原始那么获取元素会阻塞当前线程)

LinkedBlockingQueue用来干嘛

LinkedBlockingQueue一般用于生产者消费者模型业务(排队机制,先进先出)

源码解析

数据的存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
/**
* Linked list node class
*/
static class Node<E> {//存储数据的节点
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;//链表的最大长度,如果不设置值默认为Integer.MAX_VALUE
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);//统计数量线程安全
/**
* Head of linked list.
* Invariant: head.item == null
*/
private transient Node<E> head;//头节点
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;//尾节点
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();//tackLock
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();//tackLock条件不为空
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();//putLock
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();//putLock条件没满
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//默认last=head=空节点
}

数据的操作

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//不能存储空元素
int c = -1;
Node<E> node = new Node(e);//创建节点
final ReentrantLock putLock = this.putLock;//获得putLock
final AtomicInteger count = this.count;//获取当前数量
putLock.lockInterruptibly();//获取锁,如果有调用Thread.Interrupted()直接抛出异常
try {
while (count.get() == capacity) {//如果当前队列以满,进入等待状态
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public boolean offer(E e, long timeout, TimeUnit unit) offer(e)类似
throws InterruptedException {
if (e == null) throw new NullPointerException();//不能存储空元素
long nanos = unit.toNanos(timeout);//装换为纳秒
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);//等待一段时间
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();//递增
if (c + 1 < capacity)//如果未满唤醒notFull.awit
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//唤醒notEmpty.await()
return true;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//拆分为两步 last.next = node,last = node
//每次head.next=当前的last然后last.next指向node
last = last.next = node;
}

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();//删除数据时全部lock
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;//前后元素执行,大年元素设置为空
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)//count获取数量同时递减(获取数量为递减钱数量)
notFull.signal();//唤醒 notFull.await()
}

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//获取元素,消费,可能被中断
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//如果有调用Thread.Interrupted()抛出异常
try {
while (count.get() == 0) {
notEmpty.await();//元素为空进入等待状态
}
x = dequeue();//
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//获取元素,消费
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//查看元素
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
[null,aaa,bbb] queue
[null,bbb] delete after queue
去掉头部null元素获取aaa元素修改aaa元素的item=null
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;//first第一个有值的节点
h.next = h; // help GC
head = first;
E x = first.item;//获取元素
first.item = null;//设置为空
return x;
}

什么时候扩容

定长链表不支持扩容

是否线程安全

线程安全

使用注意事项

  • 默认创建方式链表醉大长度为Ineger.MAX_SIZE

引用