阻塞队列
912字约3分钟
2024-08-08
队列是 FIFO
(先进先出),阻塞有两种情况
写入:队列满了,就必须阻塞等待
读取:队列是空的,必须阻塞生产
BlockingQueue
Interface BlockingQueue<E>
已知实现类
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
:同步队列DelayQueue
LinkedBlockingDeque
LinkedTransferQueue
PriorityBlockingQueue
什么情况下使用:多线程并发处理、线程池
使用
方式 | 抛出异常 | 不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(E e, long timeout, TimeUnit unit) |
移除 | remove() | poll() | take() | poll(long timeout, TimeUnit unit) |
检查队首元素 | element() | peek() |
/**
* 抛出异常
*/
public static void test1() {
// 需要指定队列容量大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.add("apple"));
System.out.println(blockingQueue.add("watermelon"));
// 队列已满,抛出异常:java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.add("mango"));
// blockingQueue.element(),获取队首元素,apple
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// 队列为空,抛出异常:java.util.NoSuchElementException
System.out.println(blockingQueue.remove());
}
====== 源码 ======
// ArrayBlockingQueue
public boolean add(E e) {
return super.add(e);
}
// AbstractQueue 中方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
// ArrayBlockingQueue
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
// ArrayBlockingQueue
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
/**
* 有返回值,没有异常
*/
public static void test2() {
// 需要指定队列容量大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.offer("apple"));
System.out.println(blockingQueue.offer("watermelon"));
// 队列已满,不抛出异常,返回 false
System.out.println(blockingQueue.offer("mango"));
// blockingQueue.peek(),查看队首元素,apple
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 队列为空,不抛出异常,返回 null
System.out.println(blockingQueue.poll());
}
====== 源码 ======
// ArrayBlockingQueue
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// ArrayBlockingQueue
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// ArrayBlockingQueue
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
// 需要指定队列容量大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(2);
// 超过容量,一直阻塞
blockingQueue.put("apple");
blockingQueue.put("watermelon");
// blockingQueue.put("mango");
// 没有元素,一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
===== 源码 =====
// ArrayBlockingQueue
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// ArrayBlockingQueue
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* 等待,阻塞(等待超时)
*/
public static void test4() throws InterruptedException {
// 需要指定队列容量大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(2);
// 超过容量,一直阻塞
blockingQueue.offer("apple");
blockingQueue.offer("watermelon");
// 等待超过 2 秒就退出
blockingQueue.offer("mango", 2, TimeUnit.SECONDS);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 等待超过 2 秒就退出
blockingQueue.poll(2, TimeUnit.SECONDS);
}
===== 源码 =====
// ArrayBlockingQueue
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
// ArrayBlockingQueue
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
SynchronousQueue
同步队列,和其他 BlockingQueue
不一样,没有容量,不存储元素,put
一个元素之后就阻塞,take
出来之后才能继续往下执行
public static void main(String[] args) {
// 同步队列
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
// T1 put 1
// T2 take 1
// T1 put 2
// T2 take 2
// T1 put 3
// T2 take 3
===== 源码 =====
// SynchronousQueue
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// SynchronousQueue
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}