一个初始值为0的变量 两个线程交替操作 一个加1 一个减1 每个线程遍历5轮
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class Resource {
private volatile boolean flag = true;//标志位,默认开启 进行生产消费的交互
private AtomicInteger atomicInteger = new AtomicInteger();//默认值是0
//声明阻塞队列BlockingQueue
private BlockingQueue<String> blockingQueue = null;
//构造方法的方式注入
public Resource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
//生产者
public void myProd() throws Exception {
String data = null;
boolean returnValue;
while (flag) {//当flag=true,开始生产
//atomicInteger默认值0,进行+1操作,并转成字符串
data = atomicInteger.incrementAndGet() + "";
//每2秒钟把data的值添加到队列中
returnValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (returnValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列数据" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列数据" + data + "失败");
}
//睡眠一秒
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t flag=" + flag+"\t ,生产操作停止");
}
//消费者
public void myConsumer() throws Exception {
String result = null;
while (flag) {//当flag=true,开始消费
//每2秒钟从队列中取值
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
//队列中值为空
if(null==result||"".equalsIgnoreCase(result)){
flag=false;
System.out.println(Thread.currentThread().getName()+"\t"+"超过2m没有取到 消费退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "消费队列" + result + "成功");
}
}
public void stop() throws Exception{
flag=false;
}
}
/**
* @description: 生产者消费者模式示例代码(阻塞队列版)
* @author: xz
*/
public class ProdConsumerBlockQueueDemo {
public static void main(String[] args) throws Exception {
//创建Resource对象,并传入由数组结构组成的有界阻塞队列,初始大小为10
Resource resource = new Resource(new ArrayBlockingQueue<>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t生产线程启动");
try {
resource.myProd();
System.out.println();
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
},"Prod 线程").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t消费线程启动");
try {
resource.myConsumer();
System.out.println();
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
},"consumer 线程").start();
//睡眠5秒钟
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
System.out.println();
System.out.println("5秒钟时间到,停止活动");
resource.stop();
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://wwwxz.blog.csdn.net/article/details/122570197
内容来源于网络,如有侵权,请联系作者删除!