文章40 | 阅读 15156 | 点赞0
以下面这段代码为例,我们分析以下ReentrantLock的工作原理,聊一聊,ReentrantLock到底做了哪些事情!
public class ReentrantLockTest {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(()->{
lock.lock();
try {
System.out.println("A:这是一段加锁代码段");
} finally {
//lock.unlock();
}
},"A").start();
new Thread(()->{
lock.lock();
try {
System.out.println("B:这是一段加锁代码段");
} finally {
lock.unlock();
}
},"B").start();
}
}
在上面的代码你是不是疑惑为什么线程A的lock.unlock()
是被注释掉的。是不是我写错了?
这样不是会导致死锁?
这是我有意为之的,因为如题,今天我们是为了学习AQS而来的。
为了学习效果好,最后将上面的代码复制粘贴到IDEA / eclipse
中我们调试一下这段程序。
首先在类的初始化时成员属性lock就被赋值new ReentrantLock();
得到的是一个ReentrantLock实例,并且成员属性sync
被赋值为非公平锁NonfairSync
实例
对应源码
然后当我们让线程start()
时,在执行lock.lock()
方法时
调用的时ReentrantLock中的lock()
由于我们前面将sync = new NonfairSync();
因此本质上调用的是NonfairSync继承父类Sync.lock()
,
代码就执行到了
这里面有一个判断,判断中的方法在子类NonfairSync
中有实现Sync中定义的abstract
方法。
这个时候需要到Sync的子类NonfairSync中找这个方法。如下
final boolean initialTryLock() {
Thread current = Thread.currentThread(); // 获取当前线程
if (compareAndSetState(0, 1)) { // 之前没有加锁,则将state更新为1,并进行加锁 setExclusiveOwnerThread(current);
setExclusiveOwnerThread(current); // 将当前线程设置为独占
return true; // 加锁成功
} else if (getExclusiveOwnerThread() == current) {// 之前加过锁,相当于重入锁(也意味着当前线程本来就获得到了锁),第二次进入则进入代码块
int c = getState() + 1; // 将锁计数+1,相当于多次加锁,每加一次锁就会+1
if (c < 0) // 小于0,可能是超过int的上限导致变成负数抛异常
throw new Error("Maximum lock count exceeded");
setState(c);//更新state值
return true;
} else //这个分支说明当前线程所操作的资源已经被加锁了,需要等待释放锁后获得锁。所以返回false
return false;
}
阅读这段代码,会发现第一个判断调用的则是AQS的final方法
// CAS操作对state设置新值,(如果state的值为expect则将state更新为update)
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
而这个U则是封装好的一个支持cpu指令操作的一个工具类,目的就是支持CAS操作
private static final Unsafe U = Unsafe.getUnsafe(); //用的都是最底层的操作,里面有很多关于cas操作的native方法
实际上就是将state值由0,变成1
只是这种操作更安全,因为是原子性的一个操作更新值。如果更新成功则进入第一个代码块
执行 setExclusiveOwnerThread(current);将当前线程设置为独占线程,本质上就是将当前线程存起来,追踪过去又跑到了AQS父类AOS的方法原方法的源码
private transient Thread exclusiveOwnerThread; //成员属性
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread; //给成员属性赋值
}
所以这个方法也就没啥好讲的,意思就是将当前线程设置为独占排他线程。也就是一个象征性的意义没有多大作用,真正控制线程的在后面,我们继续深入。
执行initialTryLock()
方法时,根据方法内的3个分支
1.第一次调用lock
2.同一个线程重复调用了lock
3.不同线程使用同一把锁调用了lock
三个分支会导致3种情况
1、2分支则初始化锁成功,返回true
3则会返回false表示初始化锁失败
回到之前Sync中调用initialTryLock()
的代码,由于是第一次加锁因此执行的是第一个分支,返回true
下面是我加上注释后的源码
@ReservedStackAccess
final void lock() {
//initialTryLock()本质上调用的是子类重写的方法例如:NonfairSync和FairSync内的initialTryLock()
// 只有已经上锁了,当前线程没有获得到锁才会进入if
if (!initialTryLock()){
/** * 如果是不同的线程则返回false,就执行if内的 acquire(1); * 又去执行AQS的 public final void acquire(int arg)方法, * 意思就是需要将其加入线程等待队列中(判断中initialTryLock()就已经知道这个线程没有获得到锁返回了false) */
acquire(1);// AQS的线程队列是在这个方法内部形成的,需求去父类AQS查看这个final修饰方法
}
}
会发现!true
返回false则if内的acquire(1);
不会执行就返回了。
然后对应我们线程A的lock方法也就执行完了。
new Thread(()->{
try {
lock.lock();
System.out.println("A:这是一段加锁代码段");
} finally {
//lock.unlock();
}
},"A").start();
然后就会执行打印输出语句,在控制台中打印出A:这是一段加锁代码段
由于解锁语句被注释了,因此这个线程内的代码就执行完了。
B和A的逻辑差不多,但线程B如何执行的?
同样会进入 lock,本质调用Sync类中定义的 lock
也会初始化锁执行initialTryLock()
但是由于此时的线程是另一个线程我们粘贴一下源码
final boolean initialTryLock() {
Thread current = Thread.currentThread(); // 获取当前线程
if (compareAndSetState(0, 1)) { // 之前没有加锁,则将state更新为1,并进行加锁 setExclusiveOwnerThread(current);
setExclusiveOwnerThread(current); // 将当前线程设置为独占
return true; // 加锁成功
} else if (getExclusiveOwnerThread() == current) {// 之前加过锁,相当于重入锁(也意味着当前线程本来就获得到了锁),第二次进入则进入代码块
int c = getState() + 1; // 将锁计数+1,相当于多次加锁,每加一次锁就会+1
if (c < 0) // 小于0,可能是超过int的上限导致变成负数抛异常
throw new Error("Maximum lock count exceeded");
setState(c);//更新state值
return true;
} else //这个分支说明当前线程所操作的资源已经被加锁了,需要等待释放锁后获得锁。所以返回false
return false;
}
会发现此时的state state是AQS类的成员属性
之前被上一个线程更新为了1,因为使用的是同一把锁。
第一个分支不走
又因为第二个判断发现当前线程B,和独占线程A不是同一个线程因此分支2也不走
getExclusiveOwnerThread() // 返回独占线程,来自AQS父类AOS方法
因此最终返回的是false
因此下面 if 中的代码需要执行
@ReservedStackAccess
final void lock() {
if (!initialTryLock()){
acquire(1);// AQS的线程队列是在这个方法内部形成的,需求去父类AQS查看这个final修饰方法
}
}
而acquire(int);
方法是来自非公平锁的父类的父类AQS内部定义的final方法
在AQS中源码是这样定义的,下面有两个acquire
上面的会调用下面更多参数的acquire
方法
下面的代码可以先不看,先拉到后面看我的解说,然后对照着我的解说看代码
/** * 尝试获得到锁,实际调用下面多个参数的acquire()方法 */
public final void acquire(int arg) {
/** * tryAcquire(arg)调用的是子类实现的tryAcquire(int),本身在AQS则是一个抽象方法 * 需要在子类中查看具体实现,例如:ReentrantLock中的内部类NonfairSync、FairSync中的tryAcquire(int) * 因此需要去子类查看这个方法的实现只有失败了才会进一步调用acquire(null, 1, false, false, false, 0L); */
if (!tryAcquire(arg)) {
// 注意参数值除了arg的值是变量,其它都是0会false或null,一遍而言传入的是1,除非一次性加了多次锁
acquire(null, arg, false, false, false, 0L);
}
}
/** * 抢占锁的方法,加锁的时候除了arg=1其它都是null或false * * @param node * @param arg 加锁次数 * @param shared 控制是否时共享线程队列也就是SharedNode的布尔值 * @param interruptible 是否时可中断线程 * @param timed 是否由最长等待时间 * @param time 中断超时时间 */
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread(); // 获取当前线程
byte spins = 0, postSpins = 0; //
boolean interrupted = false, first = false; // 中断变量值interrupted,first表示第一次进入方法
Node pred = null; //
// 自旋获取锁
for (; ; ) {
// 循环的第一次判断为 true && false (node为null pred=null,null!=null。返回false就不执行后面的赋值和判断)
// 自旋后的第二次由于是刚创建的则prev为null因此还是false,pred=null,还是false不执行
// 当node!=null且node.prev!=null说明节点已经入队了,因此第二个判断返回true需要判断!(first = (head == pred))返回的是false
if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
// 进入这个代码块说明队列不为空且不止一个线程在等待
if (pred.status < 0) { // 实际就是传入的node的前一个节点的status是否<0
cleanQueue(); // 清空队列
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // 确保序列化
continue;
}
}
//循环第一次 false || true,需要进入代码块。pred在第一次判断就被赋值为null,循环进入前也是null
if (first || pred == null) {
boolean acquired;
try {
if (shared) {
// 循环第一次false进入else分支
acquired = (tryAcquireShared(arg) >= 0);
} else {
/** * 尝试抢占锁,如果没有抢到则会自旋,tryAcquire(arg);会一直重复调用,直到抢占成功 * 子类实现的方法。例如非公平锁的tryAcquired(1); * 如果state为0,则acquired则会变成true。将当前线程设置为独占并更新state为arg * 如果state不为0,则acquired=false,说明被其它线程上锁了 * * 自旋的起点是后面 * if (node == null) { * if (shared)// 如果是共享队列节点则创建SharedNode,然后由于后面没有代码则会自旋for循环重新执行一遍只是这个时候node不为null * node = new SharedNode(); //这个是线程队列的头节点,用来标识这个队列是一个共享锁线程等待队列 * else // 如果是一个排他锁创建一个排他节点 * node = new ExclusiveNode();// 线程队列的头节点,标识是一个排他锁线程队列 * } * 然后到这里的tryAcquire(arg);一直原地踏步,直到抢占到锁 * acquired更新为true * 进入catch后面的if * */
acquired = tryAcquire(arg); // 会回到子类(NonfairSync、FairSync等)的实现的方法尝试获得锁,如果失败则还是false,直到成功true
}
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
// true说明当前线程抢占到锁了
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1; // 返回1标签抢占到锁了这是这个方法的唯一结束点,其它分支始终都会死循环
}
}
/** * AQS队列的形成起点,头节点就是下面的SharedNode或ExclusiveNode,然后自旋 */
// 第一次进入node传入的是null因此进入
// 第二次由于第一次进入后node=new SharedNode();或node = new ExclusiveNode();则这个if就不会在进入,会进入第二个if因为pred=null
if (node == null) {
if (shared)// 如果是共享队列节点则创建SharedNode,然后由于后面没有代码则会自旋for循环重新执行一遍只是这个时候node不为null
node = new SharedNode(); //这个是线程队列的头节点,用来标识这个队列是一个共享锁线程等待队列
else // 如果是一个排他锁创建一个排他节点
node = new ExclusiveNode();// 线程队列的头节点,标识是一个排他锁线程队列
} else if (pred == null) { // 尝试将当前线程入队
node.waiter = current; // 将当前线程存入ExclusiveNode节点的waiter成员变量
Node t = tail; // 一开始tail=null
node.setPrevRelaxed(t); // 将node的prev指向tail,而此时node.next还是null,相当于加入到了队列的末尾,后面需要将队列末尾指向node形成双向的队列
if (t == null) // 第一次队列还没有形成因此t是null需要将头节点初始化
tryInitializeHead(); // 初始化头节点,并且在内部将tail也执行了这个初始的头节点
else if (!casTail(t, node)) // casTail(t, node)会将tail变成node
node.setPrevRelaxed(null); // 如果tail更新失败。则node.prev=null则又会重新进入if进行更新,直到更新成功。
else
t.next = node; // 将队列的末尾指向node形成双向队列(node.next则是null,也就是说aqs队列的末尾元素的next始终为null,prev会指向前一个节点)
} else if (first && spins != 0) { //
--spins; // 让出cpu使用权,减少线程调度得不公平性
Thread.onSpinWait(); // 让出cpu使用全和Thread.sleep(0)差不多的作用,但是Thread.onSpinWait();更高效,使用的是cpu指令
} else if (node.status == 0) { // 0是初始化赋得值,这里aqs队列得节点自然是要要让线程等待,因此更新status值为1(WAITING得值就是常量1)
node.status = WAITING; // 如果status为0更新status值为常量WAITING=1,1表示等待
} else {
long nanos;
spins = postSpins = (byte) ((postSpins << 1) | 1);// spins!=0则会调用Thread.onSpinWait();,让当前线程让出cpu的使用权
if (!timed)//如果没有设置阻塞时间
LockSupport.park(this);// 阻塞当前线程
else if ((nanos = time - System.nanoTime()) > 0L)//如果设置了阻塞时长且时间nanos > 0
LockSupport.parkNanos(this, nanos); //阻塞nanos纳秒
else // 如果时间不合法 则break
break;
node.clearStatus();// 将status重新更新为0
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);//返回0 或者 返回CANCELLED常量负数
}
会发现执行acquire(1);
内部有一个 if 判断
判断内容是,将acquires的值设置为1
/** * 代码来自非公平锁内部 * 尝试获取锁(加锁)acquires次 */
protected final boolean tryAcquire(int acquires) {
// 如果加锁线程已经释放了锁,也就是state=0那么就将当前线程设置为独占线程并更新state值为1,表示抢占锁成功
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());// 将当前线程设置为独占线程,表示当前线程获得到锁
return true;
}
return false;//如果已经加过锁了由于state则不为0则会返回false,表示抢占锁失败
}
这里面的逻辑和前面我们分析过的差不多,很明显会返回false,因为state!=0
因为返回false,所以导致内部代码块需要执行也就是
if (!tryAcquire(1)) {
acquire(null, 1, false, false, false, 0L);
}
然后我们将这些参数值带入去看下多个参数的acquire方法
这个时候拉到上面我粘贴出来的代码,结合我的注释分析一遍流程。
如果我的注释中有错误的地方,可以在评论区给我指出,我会更新文章改正过来。
关于AQS的源码我也有点疑惑,那就是关于这个方法的spins、postSpins
这个参数的具体作用是干什么的。从子面量的意思来说分别表示spins
和过期的spins
,分别用这个两个变量存。
经过这个方法,会形成如下的队列
线程A是持有锁的线程因此不在aqs队列中,而B则因为没有获得锁,就进入了等待队列。
B指我上面的线程B产生的节点,在B之前会创建一个节点,然后利用这个节点作为头,真正的B却是head.next得到的aqs队列实际意义上的队列头。
这么做的目的就是为了方便利用AQS的成员属性快速找到头。相当于始终有一个head指针指向了aqs队列的头,始终有一个tail指针指向队列末尾节点。
通过源码我们很容易看出aqs利用自旋的方式创建了一个CLH队列,然后利用LockSupport.park()将线程进行阻塞
而释放锁在我们熟悉了AQS的数据结构后,以及前面的基础后很容易定位到源码
源码依此是
ReentrantLock类
public void unlock() {
sync.release(1);
}
AQS类
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);//唤醒head.next所指向的节点线程
return true;
}
return false;
}
Sync类
// 尝试释放锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//计算释放后的state值
if (getExclusiveOwnerThread() != Thread.currentThread())// 如果当前线程不是和独占线程同一个线程抛异常IllegalMonitorStateException
throw new IllegalMonitorStateException();
boolean free = (c == 0);// 计算是否要清除独占先(计算释放后是否还有线程持有锁)
if (free)
setExclusiveOwnerThread(null);//清除独占线程
setState(c);//更新state值
return free;
}
/** * 唤醒线程队列节点中的线程 * 传入的是head,通过head.next得到等待线程的第一个节点将其唤醒 */
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING); // 更新线程状态值为常量1
LockSupport.unpark(s.waiter); // 给当前节点的线程发放一个许可,唤醒该线程
}
}
执行完signalNext后就会将队列第一个节点中的线程唤醒,仔细观察if中的判断,会发现这个方法中的h应该传入的是head的意思,也就是说将真正意义的线程队列第一个节点(实际通过head.next
得到)然后通过LockSupport唤醒该节点中的线程
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://yumbo.blog.csdn.net/article/details/109501988
内容来源于网络,如有侵权,请联系作者删除!