Java 并发编程之 LinkedBlockingQueue 阻塞队列
Java juc About 4,461 words介绍
LinkedBlockingQueue
的put
和take
为阻塞方法。
put
:队列满时等待放入。
take
:队列空时等待获取。
初始化
初始化链表。头节点是占位节点(Dummy
节点、哑元节点)
transient Node<E> head;
private transient Node<E> last;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
入队
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
出队
出队时,把item
置为空,表示已成为占位节点。
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
加锁分析
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
存数据
put
队列满时阻塞等待。
notFull.await();
:等待队列不满,继续执行
c == 0
:c
是加1
前的值,所以如果队列中有一个元素,唤醒notEmpty
的等待,让消费者去消费。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
offer
队列满时直接返回false
。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() == capacity)
return false;
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
add
队列满时抛出异常。
// java.util.AbstractQueue#add
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
取数据
take
队列为空时阻塞等待。
c == capacity
:当队列只有一个空位时,唤醒put
线程。
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
poll
队列为空时返回null
。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
final E x;
final int c;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() == 0)
return null;
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove
队列为空时抛出异常。
// java.util.AbstractQueue#remove
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
Views: 1,833 · Posted: 2021-11-05
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓
Loading...