阻塞队列,它是一个特殊的队列,符合先进先出的特点,是并发编程中的一个重要基础组件
阻塞队列的一个典型应用场景就是 “生产者消费者模型”;其作为生产者—消费者模型中的交易场所
以包饺子为例:
(假设面和馅是备好的,且 擀面杖只有一个~)
方法1: A,B,C 三个人包饺子;这三个人分别进行 擀皮 + 包饺子 过程
该方法的锁竞争(擀面杖只有一个)太激烈
方法2: 一个人负责擀皮,另外两个负责包饺子,例:A 负责擀皮,B 和 C 负责包饺子
这就是"生产者—消费者 模型"
A: 生产者 — (生产饺子皮)
B,C:消费者 — (消费饺子皮)
此处应还有一个"交易场所":放饺子皮的东西,比如:大盘子~
阻塞队列就是生产者—消费者生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
实现阻塞队列前,我们先思考普通队列是如何实现的??
有两种方式:
1.基于链表;
2.基于数组 — 循环队列
以基于数组为例:
即,通过循环队列实现:
class BlockingQueue{
private int[] array = new int[10];
// [head,tail)
// 两者重合时,队列可能为空,也可能为满
private volatile int head = 0;
private volatile int tail = 0;
private volatile int size = 0; // 有效元素个数
/* * 阻塞队列 入队列 * 为了和普通队列入队列区分,使用 put * */
public void put(int value) throws InterruptedException {
synchronized (this){
// 若队列满了, 阻塞等待, 等下面的出队列操作调用 notify 方法后才可继续执行
if(size == array.length){
wait();
}
array[tail] = value;
tail++;
if(tail == array.length){
tail = 0;
}
size++;
// 唤醒 出队列操作
notify();
}
}
/* * 阻塞队列 出队列 * 为了和普通队列出队列区分,使用 take * */
public int take() throws InterruptedException {
int ret = -1;
synchronized (this){
// 若队列为空, 阻塞等待, 等到有元素入队列再开始
if(size == 0){
wait();
}
ret = array[head];
head++;
if(head == array.length){
head = 0;
}
size--;
// 唤醒 入队列操作
notify();
}
return ret;
}
}
上述两个 wait( ) 是一定不可能被同时调用的!!
创建两个线程,分别模拟 生产者 和 消费者
以第 2 种为例:
public static void main(String[] args) {
BlockingQueue blockingQueue = new BlockingQueue();
// 创建两个线程,分别模拟 生产者 和 消费者
Thread producer = new Thread(){
@Override
public void run(){
for (int i = 0; i < 10000; i++) {
try {
blockingQueue.put(i);
System.out.println("生产元素: " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
Thread consumer = new Thread(){
@Override
public void run(){
while (true){
try {
int ret = blockingQueue.take();
System.out.println("消费元素: " + ret);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
consumer.start();
}
截取部分输出结果:
假设把上述代码中的 notify( ) 改成 notifyAll( ),此时会发生什么??
假设现在有三个线程,其中一个线程生产,两个线程消费,且消费速度快于生产速度
所以,两个消费者线程都触发了 wait 操作,也就是都发生了阻塞;当我们调用 notifyAll( ) ,会将上述两个线程都唤醒,然后这两个线程都去尝试重新竞争锁
假设:
消费者1,先获取到锁,于是执行出队列操作(执行完毕释放锁)
消费者2,后获取到锁,于是也会执行后续的出队列操作,但是刚才生产者生产的一个元素,已经被消费者1线程 给取走了,即,当前实际是一个空的队列,若强行执行出队列操作,就会出现逻辑上的错误!!
改正方法:
将 wait( ) 方法包裹的 if 改为 while
// 入队列
while (size == array.length){
wait();
}
// 出队列
while (size == array.length){
wait();
}
此时两个消费者线程尝试竞争锁
消费者1,先获取到锁,wait( ) 就返回了,再次执行 while 中的条件(由于当前生产者线程生产了一个元素,size 不为0 ),循环退出,之后,消费者1 就可以执行后续出队列操作,执行完毕后,释放锁
消费者2,后获取到锁,wait( ) 返回,再次执行 while 中的条件(由于刚才的消费者1 已经把生产的元素取走了,size 又是 0),循环继续执行,又一次调用 wait( ),只能继续等…
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/m0_47988201/article/details/121655732
内容来源于网络,如有侵权,请联系作者删除!