Java 阻塞队列

x33g5p2x  于2021-12-09 转载在 Java  
字(2.9k)|赞(0)|评价(0)|浏览(441)

定义

阻塞队列,它是一个特殊的队列,符合先进先出的特点,是并发编程中的一个重要基础组件
阻塞队列的一个典型应用场景就是 “生产者消费者模型”;其作为生产者—消费者模型中的交易场所

简单介绍 生产者—消费者 模型

以包饺子为例:
(假设面和馅是备好的,且 擀面杖只有一个~)

方法1: A,B,C 三个人包饺子;这三个人分别进行 擀皮 + 包饺子 过程
该方法的锁竞争(擀面杖只有一个)太激烈

方法2: 一个人负责擀皮,另外两个负责包饺子,例:A 负责擀皮,B 和 C 负责包饺子
这就是"生产者—消费者 模型"
A: 生产者 — (生产饺子皮)
B,C:消费者 — (消费饺子皮)
此处应还有一个"交易场所":放饺子皮的东西,比如:大盘子~

阻塞队列就是生产者—消费者生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

阻塞队列特点:

  • 若入队列操作太快,队列满了,继续入队列,就会阻塞;一直阻塞到其他线程去消费队列了,才能继续入队列 (大盘子上很快就放满了饺子皮,生产者就要等待,一直等到消费者消费了一些数据(饺子皮),即:交易场所上有了空位,才能继续生产)
  • 若出队列操作太快,队列空了,继续出队列,也会阻塞;一直阻塞到其他线程生产了元素,才能继续出队列 (大盘子上很快就空了,饺子皮很快被"消费"完了,消费者就要等待,一直等到生产者生产了新的元素(饺子皮),消费者才能继续消费)

代码实现阻塞队列

实现阻塞队列前,我们先思考普通队列是如何实现的??
有两种方式:
1.基于链表;
2.基于数组 — 循环队列

以基于数组为例:
即,通过循环队列实现:

class BlockingQueue{
    private int[] array = new int[10];
    // [head,tail)
    // 两者重合时,队列可能为空,也可能为满
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0; // 有效元素个数

    /* * 阻塞队列 入队列 * 为了和普通队列入队列区分,使用 put * */
    public void put(int value) throws InterruptedException {
        synchronized (this){
            // 若队列满了, 阻塞等待, 等下面的出队列操作调用 notify 方法后才可继续执行
            if(size == array.length){
                wait();
            }
            array[tail] = value;
            tail++;
            if(tail == array.length){
                tail = 0;
            }
            size++;
            // 唤醒 出队列操作
            notify();
        }
    }

    /* * 阻塞队列 出队列 * 为了和普通队列出队列区分,使用 take * */
    public int take() throws InterruptedException {
        int ret = -1;
        synchronized (this){
            // 若队列为空, 阻塞等待, 等到有元素入队列再开始
            if(size == 0){
                wait();
            }
            ret = array[head];
            head++;
            if(head == array.length){
                head = 0;
            }
            size--;
            // 唤醒 入队列操作
            notify();
        }
        return ret;
    }
}

代码解析:

上述两个 wait( ) 是一定不可能被同时调用的!!

  • 入队列操作的 wait( )
    当队列已满时,即:size == array.length;若有线程调用 put 方法,就让其执行 wait 方法,使该线程阻塞等待,直到有其他线程调用 take 方法,取出元素后,调用 notify 方法将刚才调用 put 方法产生阻塞的线程唤醒,接着继续执行 put 后续的操作
  • 出队列操作的 wait( )
    当队列为空时,即:size == 0;若有线程调用 take 方法,就让其执行 wait 方法,使该线程阻塞,直到有其他线程调用 put 方法,插入元素后,调用 notify 方法将刚才调用 take 方法产生阻塞的线程唤醒,接着继续执行 take 后续的操作

测试代码:

创建两个线程,分别模拟 生产者 和 消费者

  1. 消费的快,生产的慢 —— 预期看到:消费者线程会阻塞等待,有生产元素后消费者才能消费
  2. 消费的慢,生产的快 —— 预期看到::生产者线程会阻塞等待,消费者消费元素后,生产者才能继续生产

以第 2 种为例:

public static void main(String[] args) {
    BlockingQueue blockingQueue = new BlockingQueue();
	// 创建两个线程,分别模拟 生产者 和 消费者
    Thread producer = new Thread(){

        @Override
        public void run(){
            for (int i = 0; i < 10000; i++) {
                try {
                    blockingQueue.put(i);
                    System.out.println("生产元素: " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    producer.start();

    Thread consumer = new Thread(){

        @Override
        public void run(){
            while (true){
                try {
                    int ret = blockingQueue.take();
                    System.out.println("消费元素: " + ret);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    consumer.start();
}

截取部分输出结果:

假设把上述代码中的 notify( ) 改成 notifyAll( ),此时会发生什么??

假设现在有三个线程,其中一个线程生产,两个线程消费,且消费速度快于生产速度

所以,两个消费者线程都触发了 wait 操作,也就是都发生了阻塞;当我们调用 notifyAll( ) ,会将上述两个线程都唤醒,然后这两个线程都去尝试重新竞争锁
假设:
消费者1,先获取到锁,于是执行出队列操作(执行完毕释放锁)
消费者2,后获取到锁,于是也会执行后续的出队列操作,但是刚才生产者生产的一个元素,已经被消费者1线程 给取走了,即,当前实际是一个空的队列,若强行执行出队列操作,就会出现逻辑上的错误!!

改正方法:
将 wait( ) 方法包裹的 if 改为 while

// 入队列
while (size == array.length){
	wait();
}

// 出队列
while (size == array.length){
	wait();
}

此时两个消费者线程尝试竞争锁
消费者1,先获取到锁,wait( ) 就返回了,再次执行 while 中的条件(由于当前生产者线程生产了一个元素,size 不为0 ),循环退出,之后,消费者1 就可以执行后续出队列操作,执行完毕后,释放锁
消费者2,后获取到锁,wait( ) 返回,再次执行 while 中的条件(由于刚才的消费者1 已经把生产的元素取走了,size 又是 0),循环继续执行,又一次调用 wait( ),只能继续等…

相关文章