1.简介
在上一篇文章中我们分析了无界队列PriorityBlockingQueue,本篇我们分析的LinkedBlockingQueue是一个基于链表实现的有固定容量的 FIFO 的阻塞队列,队列的头部元素一直在队列中时间最长。队列的尾部是该元素已经在队列中的时间最短。新元素插入到队列的尾部,并且队列检索操作获取在队列的头部元素,如果没有指定容量,则等于Integer.MAX_VALUE ,通常情况下,我们应该手动设置一个容量避免队列过大造成机器负载或者内存爆满的情况出现
它和ArrayBlockingQueue的关系,就类似于,LinkList和Array的关系,LinkedBlockingQueue底层是基于链表的阻塞队列,ArrayBlockingQueue底层是基于数组的阻塞队列,通常来说,LinkedBlockingQueue 的吞吐量要好于 ArrayBlockingQueue.

AbstractQueue:提供给它队列的一些基本实现操作
BlockingQueue:则实现它的阻塞操作
Serializable:序列化
2.源码分析
2.1.类属性分析
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
/*
用于存储节点,它只有一个next节点,它是单向的FIFO
像LinkedBlockingDeque,它的node存在prev指针,是一个双向的阻塞队列,即它支持FIFO、FILO
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//容量,如果没有设置则为Integer.MAX_VALUE
private final int capacity;
//阻塞队列元素的个数
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//获取元素使用的锁,如take, poll
private final ReentrantLock takeLock = new ReentrantLock();
//用于队列在空时,挂起删除元素的线程
private final Condition notEmpty = takeLock.newCondition();
//添加元素时使用的锁如 put, offer
private final ReentrantLock putLock = new ReentrantLock();
//用于队列满的时候,挂起添加元素的线程
private final Condition notFull = putLock.newCondition();
从上面的属性我们可以知道,它的数据是封装在node节点中的,且只有一个next指针,head和last表示队列的头部和尾部,新插入的元素是指向尾部插入,它使用两把锁takeLock(控制删除)和putLock(控制添加)这点和ArrayBlockingQueue不同(只有一把锁),所以说它的添加操作和删除操作是可以同时进行的,这么来看的话,它的吞吐量是要远大于ArrayBlockingQueue的,对于它的两把锁,各提供了一个Condition队列,用来处理自己线程的挂起和唤醒(即线程通信)
2.2.构造方法
public LinkedBlockingQueue() {
//不传,默认Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
//容量
this.capacity = capacity;
//首尾指向同一个节点,节点值为空,next还没有指定
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//获取插入锁
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
//遍历向尾部插入元素
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
它的构造只有第2个才是创建一个有容量的队列,第一个和第三个都是Integer.MAX_VALUE
2.3.入队
提供了3个入队方法,分别是
| 入队方法 | 含义 | 返回值 |
|---|---|---|
| offer(E e) | 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false | true/false |
| offer(E e, long timeout, TimeUnit unit) | 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用,添加成功返回true,否则返回false | true/false |
| put(E e) | 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用,没有返回值 | 无返回值 |
2.3.1. offer(E e)
public boolean offer(E e) {
//检查e值
if (e == null) throw new NullPointerException();
//队列元素的个数
final AtomicInteger count = this.count;
//容量已经满了,直接返回
if (count.get() == capacity)
return false;
int c = -1;
//创建一个node节点
Node<E> node = new Node<E>(e);
//获取写锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//再次判断容量
if (count.get() < capacity) {
//把node添加到尾部,last指向添加的元素
enqueue(node);
//元素个数+1
c = count.getAndIncrement();
//还有剩余空间
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//解锁
putLock.unlock();
}
//如果队列等于空
if (c == 0)
//通知获取的线程阻塞
signalNotEmpty();
//返回成功插入或者失败插入
return c >= 0;
}
private void signalNotEmpty() {
//获取take锁,用户获取或出队
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒take线程的condition等待队列
notEmpty.signal();
} finally {
//解锁
takeLock.unlock();
}
}
offer方法,如果容量不足会直接返回,不会等待,否则的话,会首先添加写锁,然后在尾部加入要插入的节点,同时last节点,即 尾节点会执行插入的节点
2.3.1. offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//等待锁的最长时间
long nanos = unit.toNanos(timeout);
int c = -1;
//锁变量
final ReentrantLock putLock = this.putLock;
//元素计数器
final AtomicInteger count = this.count;
//可中断加锁,就是如果线程发生中断,自己不会处理异常而是抛出去,让上层处理,看它的方法都是直接抛异常的
putLock.lockInterruptibly();
try {
//队列满了
while (count.get() == capacity) {
//如果超时,则返回插入失败
if (nanos <= 0)
return false;
//当前线程等待 nanos这么多时间,直到发出别的线程发出信号唤醒或者是线程中断,或者是超时过期
nanos = notFull.awaitNanos(nanos);
}
//队列尾部插入元素
enqueue(new Node<E>(e));
//元素个数+1
c = count.getAndIncrement();
//还有剩余空间
if (c + 1 < capacity)
//插入线程会被唤醒
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
//队列为空
if (c == 0)
//获取线程等待
signalNotEmpty();
return true;
}
这个方法和上面的put方法的差距在于,在容量满的时候,它会等待一段时间,直到被唤醒被超时被中断才会继续执行,而不是直接返回
2.3.1. put(E e)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//创建节点
Node<E> node = new Node<E>(e);
//获取put锁
final ReentrantLock putLock = this.putLock;
//元素个数
final AtomicInteger count = this.count;
//可中断加锁,就是如果线程发生中断,自己不会处理异常而是抛出去,让上层处理,看它的方法都是直接抛异常的
putLock.lockInterruptibly();
try {
//容量已经满了,会不断等待,直到被中断或者被唤醒
while (count.get() == capacity) {
notFull.await();
}
//添加元素
enqueue(node);
//元素个数+1
c = count.getAndIncrement();
//还有剩余容量
if (c + 1 < capacity)
//唤醒插入线程
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
//队列为空
if (c == 0)
//获取线程阻塞
signalNotEmpty();
}
当容量满的时候会不断的等待,直到线程被中断或被唤醒
对比三个插入方法,大家可以看见在使用put方法时是没有只要正常情况是一定会插入元素的,而offer方法则不同,如果容量不足或者等待超时,会直接返回插入失败的
2.4.出队
出队也提供了几个方法,如下
| 方法名 | 含义 |
|---|---|
| take() | 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 |
| poll() | 获取并移除此队列的头,如果此队列为空,则返回 null |
| poll(long timeout, TimeUnit unit) | 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要) |
2.4.1.take()
public E take() throws InterruptedException {
E x;
int c = -1;
//计数器
final AtomicInteger count = this.count;
//take锁
final ReentrantLock takeLock = this.takeLock;
//可中断加锁,就是如果线程发生中断,自己不会处理异常而是抛出去,让上层处理,看它的方法都是直接抛异常的
takeLock.lockInterruptibly();
try {
//队列为空
while (count.get() == 0) {
//不断等待,直到被唤醒或线程中断
notEmpty.await();
}
//出队
x = dequeue();
//元素个数-1(先返回值,然后在-1)
c = count.getAndDecrement();
//如果还有元素
if (c > 1)
//唤醒其它出队线程
notEmpty.signal();
} finally {
//解锁
takeLock.unlock();
}
//如果之前队列是满的,
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//头结点,头结点的值是一个空节点
Node<E> h = head;
//真正存在元素的第一个节点
Node<E> first = h.next;
//让头节点指向自己,即去掉next指针,帮助GC
h.next = h; // help GC
//头节点移动的第一个节点
head = first;
//获取第一个节点的值
E x = first.item;
//把第一个节点的值置为空,作为头节点
first.item = null;
//返回第一个节点的值
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
//上锁
putLock.lock();
try {
//通知入队线程,队列还没满
notFull.signal();
} finally {
putLock.unlock();
}
}
take操作,返回头节点的下一个节点的值,即第一个真正元素的值,正常情况下方法是一定有返回的,桶put操作类似
2.4.2.poll()
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
代码和上面的take操作很像,只是在队列是空的时候,不等待了直接返回空
2.4.3.poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
和上面的代码差别就是,队列为空的时候,它会等一段时间,直到超时,中断或被唤醒
2.5.获取元素
public E peek() {
//队列为空,直接返回空
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
//上锁,在读的时候,不能够出队
takeLock.lock();
try {
//返回第一个元素的值
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
//解锁
takeLock.unlock();
}
}
和出队不同的是,它不会删除元素,只是返回第一个元素
2.6.获取元素
public boolean remove(Object o) {
if (o == null) return false;
//都上锁
fullyLock();
try {
//从头部节点开始遍历
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//如果相等
if (o.equals(p.item)) {
//执行删除
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//p:要删除的节点
//trail:要删除节点的前一个节点
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
//把p置为空,方便GC
p.item = null;
//让p的前一个节点的next节点指向p的下一个节点,即删除p节点
trail.next = p.next;
//如果删除的是尾节点
if (last == p)
尾部节点的p的前一个节点
last = trail;
//getAndDecrement:先返回容量,然后执行-1
//如果之前的容量已经满了,那么-1之后就又有空间了
if (count.getAndDecrement() == capacity)
//唤醒入队线程进行入队
notFull.signal();
}
3.总结
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象来实现线程的等待和唤醒
LinkedBlockingQueue它和ArrayBlockingQueue的不同点
1:ArrayBlockingQueue一定是有界的而LinkedBlockingQueue可以是有界也可以是无界的(没有设置容量时)
2.ArrayBlockingQueue底层采用的是数组作为存储容器而LinkedBlockingQueue则是链表,
3.ArrayBlockingQueue在插入和删除时不会产生临时对象,是移动位置而LinkedBlockingQueue则会产生一些临时对象,当处理高并发大数据时,为了回收这些对象,对GC的影响较大
4.ArrayBlockingQueue它的插入删除操作都是一把锁而LinkedBlockingQueue则把插入和删除操作分开了,分别使用了一把锁,这样在高并发的情况下生产者和消费者可以并行的操作队列中的数据,提高整体的吞吐量
到这里我们就分析完了整个LinkedBlockingQueue的核心方法,如果在分析过程中存在什么错误的,希望大家指出来,最后谢谢大家




