Java 中队列同步器 AQS(AbstractQueuedSynchronizer)实现原理
前言
在 Java 中通过锁来控制多个线程对共享资源的访问,使用 Java 编程语言开发的朋友都知道,可以通过 synchronized 关键字来实现锁的功能,它可以隐式的获取锁,也就是说我们使用该关键字并不需要去关心锁的获取和释放过程,但是在提供方便的同时也意味着其灵活性的下降。例如,有这样的一个场景,先获取锁 A,然后再获取锁 B,当锁 B 获取到之后,释放锁 A 同时获取锁 C,当获取锁 C 后,再释放锁 B 同时获取锁 D,依次类推,像这种比较复杂的场景,使用 synchronized 关键字就比较难实现了。
在 Java SE 5 之后,新增加了 Lock 接口和一系列的实现类来提供和 synchronized 关键字一样的功能,它需要我们显示的进行锁的获取和释放,除此之外还提供了可响应中断的锁获取操作以及超时获取锁等同步特性。JDK 中提供的 Lock 接口实现类大部分都是聚合一个同步器 AQS 的子类来实现多线程的访问控制的,下面我们看看这个构建锁和其它同步组件的基础框架——队列同步器 AQS(AbstractQueuedSynchronizer)。
AQS 基础数据结构
同步队列
队列同步器 AQS(下文简称为同步器)主要是依赖于内部的一个 FIFO(first-in-first-out)双向队列来对同步状态进行管理的,当线程获取同步状态失败时,同步器会将当前线程和当前等待状态等信息封装成一个内部定义的节点 Node,然后将其加入队列,同时阻塞当前线程;当同步状态释放时,会将同步队列中首节点唤醒,让其再次尝试去获取同步状态。同步队列的基本结构如下:
队列节点 Node
同步队列使用同步器中的静态内部类 Node 用来保存获取同步状态的线程的引用、线程的等待状态、前驱节点和后继节点。
同步队列中 Node 节点的属性名称和具体含义如下表所示:
属性类型和名称 | 描述 |
---|---|
volatile int waitStatus | 当前节点在队列中的等待状态 |
volatile Node prev | 前驱节点,当节点加入同步队列时被赋值(使用尾部添加方式) |
volatile Node next | 后继节点 |
volatile Thread thread | 获取同步状态的线程 |
Node nextWaiter | 等待队列中的后继节点,如果当前节点是共享的,则该字段是一个 SHARED 常量 |
每个节点线程都有两种锁模式,分别为 SHARED 表示线程以共享的模式等待锁,EXCLUSIVE 表示线程以独占的方式等待锁。同时每个节点的等待状态 waitStatus 只能取以下表中的枚举值:
枚举值 | 描述 |
---|---|
SIGNAL | 值为 -1,表示该节点的线程已经准备完毕,等待资源释放 |
CANCELLED | 值为 1,表示该节点线程获取锁的请求已经取消了 |
CONDITION | 值为 -2,表示该节点线程等待在 Condition 上,等待被其它线程唤醒 |
PROPAGATE | 值为 -3,表示下一次共享同步状态获取会无限进行下去,只在 SHARED 情况下使用 |
0 | 值为 0,初始状态,初始化的默认值 |
同步状态 state
同步器内部使用了一个名为 state 的 int 类型的变量表示同步状态,同步器的主要使用方式是通过继承,子类通过继承并实现它的抽象方法来管理同步状态,同步器给我们提供了如下三个方法来对同步状态进行更改。
方法签名 | 描述 |
---|---|
protected final int getState() | 获取当前同步状态 |
protected final void setState(int newState) | 设置当前同步状态 |
protected final boolean compareAndSetState(int expect, int update) | 使用 CAS 设置当前状态,该方法能够保证状态设置的原子性 |
在独享锁中同步状态 state 这个值通常是 0 或者 1(如果是重入锁的话 state 值就是重入的次数),在共享锁中 state 就是持有锁的数量。
独占式同步状态获取与释放
同步器中提供了 acquire(int arg) 方法来进行独占式同步状态的获取,获取到了同步状态也就是获取到了锁,该方法源码如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
方法首先会调用 tryAcquire 方法尝试去获取锁,查看方法的源码可以发现,同步器并未对该方法进行实现(只是抛出一个不支持操作异常 UnsupportedOperationException),这个方法是需要后续同步组件的开发人员自己去实现的,如果方法返回 true 则表示当前线程成功获取到锁,调用 selfInterrupt() 中断当前线程(PS:这里留给大家一个问题:为什么获取了锁以后还要中断线程呢?),方法结束返回,如果方法返回 false 则表示当前线程获取锁失败,也就是说有其它线程先前已经获取到了锁,此时就需要把当前线程以及等待状态等信息添加到同步队列中,下面来看看同步器在线程未获取到锁时具体是如何实现。
通过源码发现,当获取锁失败时,会执行判断条件与操作的后半部分 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先指定锁模式为 Node.EXCLUSIVE 调用 addWaiter 方法,该方法源码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
通过方法参数指定的锁模式(共享锁 or 独占锁)和当前线程构造出一个 Node 节点,如果同步队列已经初始化,那么首先会进行一次从尾部加入队列的尝试,使用 compareAndSetTail 方法保证原子性,进入该方法源码可以发现是基于 sun.misc 包下提供的 Unsafe 类来实现的。如果首次尝试加入同步队列失败,会再次调用 enq 方法进行入队操作,继续跟进 enq 方法源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通过其源码可以发现和第一次尝试加入队列的代码类似,只是该方法里面加了同步队列初始化判断,使用 compareAndSetHead 方法保证设置头节点的原子性,同样它底层也是基于 Unsafe 类,然后外层套了一个 for (;