一、AQS的核心设计思想
AbstractQueuedSynchronizer(JSR 166)是 Doug Lea 为 JDK 并发包设计的"排队-模板方法"框架。所有基于AQS的同步器(ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier)都遵循同一套模式:**用int state表示资源数量,用CLH队列管理等待线程**。
1.1 两大核心组成部分
// AQS 核心数据结构
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 资源计数器(volatile保证可见性)
// ReentrantLock: state=1表示已加锁,>1表示重入次数
// Semaphore: state=可用许可证数量
// CountDownLatch: state=初始count值
private volatile int state;
// CLH变体队列(FIFO双向队列)
// head = 当前持有锁的线程(或已取消的节点)
// tail = 最新加入的节点
private transient volatile Node head;
private transient volatile Node tail;
// Node是等待队列中的节点
static final class Node {
volatile int waitStatus; // CANCELLED/SIGNAL/PROPAGATE/CONDITION
volatile Node prev;
volatile Node next;
volatile Thread thread; // 封装的线程
Node nextWaiter; // 条件队列指针(ConditionObject用)
// 关键常量
static final int CANCELLED = 1; // 已取消(超时或中断)
static final int SIGNAL = -1; // 后继节点需要被唤醒
static final int CONDITION = -2; // 节点在条件队列中等待
static final int PROPAGATE = -3; // 共享锁释放需要传播给后继
}
}
// AQS子类需要覆盖的两个模板方法:
// ✅ tryAcquire(int arg) — 独占模式:尝试获取锁
// ✅ tryRelease(int arg) — 独占模式:尝试释放锁
// ✅ tryAcquireShared(int) — 共享模式:尝试获取共享锁
// ✅ tryReleaseShared(int) — 共享模式:尝试释放共享锁
// ✅ isHeldExclusively() — 是否被当前线程独占
二、acquire模板方法源码解析
2.1 独占模式获取锁
// public final void acquire(int arg)
// 入口:尝试"狡猾获取"——tryAcquire失败后才入队等待
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 子类实现,原子CAS
acquireQueued( // 入CLH队列,自旋+park
addWaiter(Node.EXCLUSIVE), // 添加独占节点
arg)) {
// 如果acquireQueued返回true(被中断),自我中断
selfInterrupt();
}
}
// addWaiter: 快速路径一次CAS入队,失败则走enq自旋入队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 队列已初始化
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS设置尾节点
pred.next = node;
return node;
}
}
enq(node); // 队列未初始化或CAS失败,自旋入队
return node;
}
// enq: 初始化队列并入队
private Node enq(Node node) {
for (;;) { // 死循环,保证最终入队成功
Node t = tail;
if (t == null) { // 队列未初始化,先初始化head节点(空哨兵)
if (compareAndSetHead(new Node())) {
tail = head; // head和tail都指向空哨兵节点
}
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t; // 成功入队
}
}
}
}
// acquireQueued: 在队列中自旋等待(核心)
final boolean acquireQueued(Node node, int arg) {
boolean interrupted = false;
for (;;) {
Node pred = node.prev;
if (pred == head) { // 关键:前驱是head说明是第一个等待线程
boolean acquired = tryAcquire(arg);
if (acquired) { // 再次尝试获取锁
setHead(node); // 获取成功,设为新的head
pred.next = null; // 断开旧head的next,帮助GC
return interrupted;
}
}
// 不能获取锁:判断是否需要park
if (shouldParkAfterFailedAcquire(pred, node)) {
// parkAndCheckInterrupt: 阻塞线程,等待被唤醒
// park()使用的是LockSupport.park(),底层是pthread_cond_wait
if (parkAndCheckInterrupt()) {
interrupted = true; // 醒来后被中断
}
}
}
}
// shouldParkAfterFailedAcquire: 判断是否可以安全park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) { // ✅ 安全:前驱会唤醒自己
return true;
}
if (ws > 0) { // ❌ CANCELLED:跳过已取消的节点
do {
node.prev = pred = pred.prev; // 循环找到有效前驱
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// ws=0(PROPAGATE/CONDITION):需要确保前驱能唤醒自己
// 将前驱设为SIGNAL(暗示需要唤醒后继)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // 继续自旋,下一轮再检查
}
2.2 独占模式释放锁
// public final boolean release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) { // 子类实现,释放资源
Node h = head;
// head存在且waitStatus != 0,需要唤醒后继
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h); // 唤醒head的后继节点
}
return true;
}
return false;
}
// unparkSuccessor: 唤醒队列中第一个需要被唤醒的节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) { // 将head的waitStatus改为0(已处理)
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next; // 后继节点
if (s == null || s.waitStatus > 0) { // 后继为空或已取消
// 从tail向前遍历,找到离head最近的非取消节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) { s = t; }
}
}
if (s != null) {
LockSupport.unpark(s.thread); // 唤醒后继线程
}
}
// ✅ 关键洞察:为什么从tail向前遍历?
// 因为enq中node.prev=t和tail=node之间可能有竞争
// 导致tail.next=null但tail.prev已设置
// 从tail向前可以找到还未完全入队的节点
三、共享模式:Semaphore与CountDownLatch
3.1 共享模式获取:doAcquireSharedInterruptibly
// Semaphore的acquire()走的是共享模式
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0) { // < 0 表示获取失败
doAcquireSharedInterruptibly(arg);
}
}
private void doAcquireSharedInterruptibly(int arg) {
Node node = addWaiter(Node.SHARED); // 添加SHARED模式节点
boolean failed = true;
try {
for (;;) {
Node pred = node.prev;
if (pred == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // ≥0 表示成功
// 关键:传播(Propagate)逻辑
setHeadAndPropagate(node, r);
pred.next = null; // 帮助GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(pred, node) &&
parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
if (failed) cancelAcquire(node);
}
}
// setHeadAndPropagate: 设置新head并传播唤醒
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 设为新head
// 传播条件:
// propagate > 0:还有可用资源
// h.waitStatus < 0:旧head需要唤醒
// (h = head) != null:新head也需要唤醒
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) { // 后继是共享节点
doReleaseShared(); // 继续传播,唤醒更多等待线程
}
}
}
// doReleaseShared: 共享模式释放(核心传播逻辑)
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// head是SIGNAL,需要唤醒后继
if (compareAndSetWaitStatus(h, SIGNAL, 0)) {
unparkSuccessor(h);
}
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// PROPAGATE:解决多线程同时释放时的传播问题
// 保证releaseShared也能正确传播唤醒
}
}
if (h == head) break; // head没变则退出
}
}
3.2 CountDownLatch 源码
// CountDownLatch只使用共享模式,不涉及独占
// Sync是CountDownLatch的内部AQS实现
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); } // state=初始count
// 获取:count减到0才能成功
protected int tryAcquireShared(int acquires) {
return getState() == 0 ? 1 : -1;
}
// 释放:count递减,递减到0时唤醒所有等待线程
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false; // 已经是0了,不能再减
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0; // 只有减到0才返回true唤醒
}
}
}
}
// await()逻辑:count>0时进入共享等待
// countDown()逻辑:每次count--,减到0时调用doReleaseShared唤醒所有节点
// 典型应用:等待多个线程完成
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(N);
// N个线程: start.await() → 工作 → end.countDown()
// 主线程: start.countDown() → end.await() → 全部完成
四、ConditionObject源码:await/signal
// ConditionObject是Object.wait/notify的替代方案
// 每个Condition维护一个独立的等待队列(不同于AQS的CLH队列)
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter; // 条件队列头(FIFO)
private transient Node lastWaiter; // 条件队列尾
// await: 将当前线程加入条件队列,并释放锁
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter(); // 加入条件队列
int savedState = fullyRelease(node); // 释放所有已持有的锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 不在CLH队列中
LockSupport.park(this);
if ((interruptMode = checkInterruptWhilePark(node)) != 0) break;
}
// 被signal唤醒后,重新acquireQueued竞争锁
acquireQueued(node, savedState);
// ...
}
// signal: 将条件队列的第一个节点移到CLH队列
public final void signal() {
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) {
doSignal(first);
}
}
private void doSignal(Node first) {
do {
firstWaiter = first.nextWaiter;
if (firstWaiter == null) lastWaiter = null;
first.nextWaiter = null; // 断开条件队列连接
// 关键:将节点从条件队列转移到CLH队列
transferForSignal(first) ?
(first = null) : // 成功,结束
(first = firstWaiter); // 失败(已取消),处理下一个
} while (first != null);
}
// transferForSignal: 将节点转移到CLH队列
final boolean transferForSignal(Node node) {
// 如果节点已被取消(waitStatus != CONDITION),返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 入CLH队列
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱已取消,或前驱设为SIGNAL失败(CAS)
// 则直接unpark当前节点(确保signal生效)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
}
// ⚠️ 生产使用注意:
// - signal()只是将节点移到CLH队列,不立即唤醒
// - 节点在CLH队列中竞争锁,成功后才继续执行
// - signalAll():将所有条件队列节点移到CLH队列
// - 与Object.wait/notify相比:Condition可以创建多个,一个锁可以有多个条件队列
五、自定义同步器实战
// 自定义互斥锁(不可重入)
public class Mutex implements Lock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean tryAcquire(int arg) {
// CAS将state从0设为1(成功=获取锁)
return compareAndSetState(0, 1);
}
protected boolean tryRelease(int arg) {
setState(0); // 释放锁,直接设为0
return true;
}
public boolean isHeldExclusively() { return getState() == 1; }
}
public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public Condition newCondition() { return sync.newCondition(); }
}
// 自定义信号量(资源池)
public class BoundedSemaphore {
private final Sync sync;
public BoundedSemaphore(int permits) {
sync = new Sync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) { setState(permits); }
protected int tryAcquireShared(int reduces) {
for (;;) {
int available = getState();
int remaining = available - reduces;
if (remaining < 0) return -1; // 获取失败
if (compareAndSetState(available, remaining)) return 1; // 成功
}
}
protected boolean tryReleaseShared(int returns) {
for (;;) {
int current = getState();
int next = current + returns;
if (compareAndSetState(current, next)) return true;
}
}
}
}