并发编程系列之AQS实现原理
AQS(AbstractQueuedSynchronizer),抽象队列同步器,是juc中很多Lock锁和同步组件的基础,比如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore等等,提供了对资源的占用、释放,线程的等待、唤醒等等接口或具体实现,可以用在各种需要控制资源竞争的场景中。
看一下juc包里的AbstractQueuedSynchronizer
的源码:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//头结点
private transient volatile Node head;
//尾节点
private transient volatile Node tail;
//volatile修饰的信号量
private volatile int state;
// 链表的Node节点
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
// ...
}
// AbstractQueuedSynchronizer父类
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
// ...
}
AQS主要由如下组件构成,最主要的还是信号量stata和CHL队列
源码里,state是由volatile关键字修饰的,在前面的学习中,可以知道并发编程的三大特性是:原子性、有序性、可见性。而volatile修饰的变量,可以保证有序性和可见性。
volatile保证可见性
这个涉及到JMM,不熟悉的读者可以翻一下我之前博客
cpu、jvm、内存都会在代码执行过程进行指令重排序,重排序过程,可能出现一些程序异常,所以,要保证有序性,可以加上volatile关键字,volatile避免重排序依赖于操作系统的内存屏障。
前面说明了加volatile的原因,然后还有一个特性,原子性,volatile只能保证单个变量的原子性,并不能保证一系列操作的原子性,所以不是线程安全的,然后AQS里是怎么保证线程安全的?然后AQS源码里怎么实现的?翻下源码,找到两个地方:
这里就是很常见的CAS锁,利用了unsafe的api,利用cpu操作的原子性保证这一操作的原子性
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这个方法,直接将一个newStata设置,因为修饰的是一个变量,对基本类型的变量进行直接赋值时,是可以保证原子性的
protected final void setState(int newState) {
state = newState;
}
AQS 作者 Doug Lea 给出的一份AQS文档:http://gee.cs.oswego.edu/dl/papers/aqs.pdf,里面对AQS做了比较详细的描述,英文水平比较好的读者可以看看
CHL队列是AQS的一个核心,FIFO 队列,即先进先出队列,这个队列的主要作用是存储等待的线程,假设很多线程去抢锁,大部分线程是抢不到锁的,所以这些线程需要有个队列来存储,这个队列就是CHL队列,同样遵循FIFO规则。
CHL队列内部结构是一个双向链表的数据结构,基本组件是Node节点,队列中,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点,head节点的线程就可以持有锁,然后后面的线程排队等待,如图所示
AbstractQueuedSynchronizer
类extends AbstractOwnableSynchronizer
类,AbstractOwnableSynchronizer
有exclusiveOwnerThread
这个定义的线程对象,表示独占模式下同步器的持有者。
前面已经对AQS的整体设计有了一个比较清晰的了解,主要关注点是信号量state和CHL队列,然后可以跟下源码实现,@see java.util.concurrent.locks.AbstractOwnableSynchronizer
,看源码不需要一点点看,看下主要的方法和类即可,理清框架的实现原理就可以,AQS主要关注*acquire*
和 *release*
这些方法就可以,分为两种类型方法,一种是独占式,另外一种是共享式,共享式方法都有加上一个shared
后缀
acquire
、acquireShared
:定义了资源争用的逻辑,如果没拿到,就等待tryAcquire
、tryAcquireShared
:实际执行占用资源的操作,由具体的AQS使用者实现release
、releasedShared
:定义了资源释放的逻辑,资源释放之后,调整队列后续节点继续争抢tryRelease
、tryReleaseShared
:实际执行资源释放的操作,具体的由AQS使用者实现下面跟下AQS源码,注意对比独占式和共享式的实现方式有什么区别
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire
方法总体思路:
tryAcquire()
方法尝试去获取资源,如果成功直接返回addWaiter
方法将当前线程包装成Node插入到CLH队列尾部,独占的标记为Node.EXCLUSIVE
acquireQueued
方法将线程阻塞在等待队列,直到获取到资源。整个等待过程,线程被中断过,返回true,否则返回false。获取到资源后而且被中断过,就会调用selfInterrupt
将中断补上tryAcquire
方法由具体使用者类实现,不实现是会抛出异常的,所以必须实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
addWaiter
方法作用,将当前线程包装成Node插入到CLH队列尾部
private Node addWaiter(Node mode) {
// 将当前线程封装成Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 尾节点不能空,队列已经初始化过
if (pred != null) {
// Node节点的前prev指针指向尾节点
node.prev = pred;
// cas锁操作,保证线程安全
if (compareAndSetTail(pred, node)) {
// 尾节点指针指向Node节点
pred.next = node;
return node;
}
}
// 队列没初始化过,调用enq方法
enq(node);
return node;
}
跟一下enq
方法
private Node enq(final Node node) {
// 无限循环,类似于while(true),自旋操作
for (;;) {
// 尾节点
Node t = tail;
if (t == null) { // Must initialize
// 创建new一个节点,设置为头节点
if (compareAndSetHead(new Node()))
// 头结点赋值给尾结点
tail = head;
} else {
// 如果尾节点不为空,将Node节点的前驱指向尾节点
node.prev = t;
// cas,将node节点设置为尾节点
if (compareAndSetTail(t, node)) {
// 尾节点的next指针指向Node节点
t.next = node;
return t;
}
}
}
}
再往上看一下java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
方法的acquireQueued
方法,前面说到acquireQueued
方法作用是,线程排队等待,直到获取到资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋,无限循环
for (;;) {
// 寻找node节点前驱节点
final Node p = node.predecessor();
// 前驱节点是head节点,tryAcquire
if (p == head && tryAcquire(arg)) {
// 获取资源成功,将node节点设置为head新的节点
setHead(node);
// 原来的head节点设置为null,等待GC垃圾回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// p不是头结点,或者tryAcquire()获取资源失败
// shouldParkAfterFailedAcquire判断是否可以被park
// parkAndCheckInterrupt判断park和Interrupt是否成功
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享式即共享资源可以被多个线程同时占有
先看顶层的acquireShared
方法:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared
方法是由具体使用者类去实现的,所以先看看doAcquireShared
源码:
private void doAcquireShared(int arg) {
// 封装为Node,标记为Node.SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 无限循环,自旋
for (;;) {
// 查找前驱节点
final Node p = node.predecessor();
// 是否为head节点
if (p == head) {
// 尝试抢资源
int r = tryAcquireShared(arg);
if (r >= 0) { // 抢资源成功
// 设置为头节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// p不是头结点,或者tryAcquire()获取资源失败
// shouldParkAfterFailedAcquire判断是否可以被park
// parkAndCheckInterrupt判断park和Interrupt是否成功
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个逻辑和独占式基本差不多,不同的地方在于入队的Node是标志为SHARED共享式的,tryAcquireShared是由具体的类去实现
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://smilenicky.blog.csdn.net/article/details/122414051
内容来源于网络,如有侵权,请联系作者删除!