跳至主要內容

JUC4 - AQS队列同步器

codejavajuc约 5596 字大约 19 分钟

JUC4

源码分析是 JDK 1.8 的,之后的版本是有改动的,JDK 17 就跟 JDK 1.8 对应的获取锁等操作有一些不同

队列同步器 AQS

前一部分了解了可重入锁和读写锁,这一章来分析下如何实现,它们的底层实现原理是怎么样的

比如对于ReentrantLocklock()方法,其具体实现为:

public void lock() {
    sync.lock();
}

可以看到,它的内部实际上啥都没做,而是交给了Sync对象在进行,并且,不只是这个方法,其他的很多方法都是依靠Sync对象在进行:

public void unlock() {
    sync.release(1);
}

那么这个Sync对象是干什么的呢?

可以看到,在 ReentrantLock 的源码里,公平锁和非公平锁都是继承自Sync,而Sync是继承自AbstractQueuedSynchronizer,简称队列同步器 (AQS):

abstract static class Sync extends AbstractQueuedSynchronizer {
   //...
}

static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}

所以,要了解它的底层到底是如何进行操作的,还得看队列同步器

AQS源码分析

AbstractQueuedSynchronizer(AQS) 是实现锁机制的基础,它的内部封装了包括锁的获取、释放、以及等待队列。

一个锁(排他锁为例)的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列

AQS已经将这些基本的功能封装完成了,其中等待队列是核心内容,等待队列是由双向链表数据结构实现的,每个等待状态下的线程都可以被封装进结点中并放入双向链表中,而对于双向链表是以队列的形式进行操作的,它像这样:

alt text
alt text

AQS中有一个head字段和一个tail字段分别记录双向链表的头结点和尾结点,而之后的一系列操作都是围绕此队列来进行的。

每个线程都会被封装为一个 Node 作为链表的操作节点

静态内部类 Node

我们先来了解一下每个结点都包含了哪些内容:

// 每个处于等待状态的线程都可以是一个节点,并且每个节点是有很多状态的
static final class Node {
   // 每个节点都可以被分为独占模式节点或是共享模式节点,分别适用于独占锁和共享锁
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

   // 等待状态,这里都定义好了
   // 唯一一个大于0的状态,表示已失效,可能是由于超时或中断,此节点被取消。
    static final int CANCELLED =  1;
   // 此节点后面的节点被挂起(进入等待状态)
    static final int SIGNAL    = -1; 
   // 在条件队列中的节点才是这个状态
    static final int CONDITION = -2;
   // 传播,一般用于共享锁
    static final int PROPAGATE = -3;

    volatile int waitStatus;    // 等待状态值
    volatile Node prev;   // 双向链表基操,标记前一个节点
    volatile Node next;
    volatile Thread thread;   // 每一个线程都可以被封装进一个节点进入到等待队列
  
    Node nextWaiter;   // 在等待队列中表示模式,条件队列中作为下一个结点的指针

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
AQS初始化

state就表示的是我们当前锁的一个状态

在一开始的时候,headtail都是nullstate为默认值0

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

不用担心双向链表不会进行初始化,初始化是在实际使用时才开始的,先不管,我们接着来看其他的初始化内容:

// 直接使用Unsafe类进行操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 记录类中属性的在内存中的偏移地址,方便Unsafe类直接操作内存进行赋值等(直接修改对应地址的内存)

// 这里对应的就是AQS类中的state成员字段
private static final long stateOffset;
// 这里对应的就是AQS类中的head头结点成员字段
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {   
    // 静态代码块,在类加载的时候就会自动获取偏移地址
    try {
        // 先通过反射 AbstractQueuedSynchronizer.class.getDeclaredField("state") 得到对应的 Filed对象
        // 然后再基于此找到对应的属性偏移量
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

//通过CAS操作来修改头结点
private final boolean compareAndSetHead(Node update) {
    //调用的是Unsafe类的compareAndSwapObject方法,通过CAS算法比较对象并替换
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
 * CAS tail field. Used only by enq.
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

/**
 * CAS waitStatus field of a node.
 */
private static final boolean compareAndSetWaitStatus(Node node,
                                                        int expect,
                                                        int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

/**
 * CAS next field of a node.
 */
private static final boolean compareAndSetNext(Node node,
                                                Node expect,
                                                Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装

CAS就是不会强行加锁(无锁算法 乐观锁),需要在替换值的时候进行比较,只有在赋值前发现原始值是我们预期的值才会进行赋值

通过 静态代码块 在加载时就获得对应的 head tail 以及 state 对应的内存地址偏移量,然后再通过 Unsafe 来直接操作对应内存地址赋值

如何使用AQS

它提供了一些可重写的方法(根据不同的锁类型和机制,可以自由定制规则,并且为独占式和非独占式锁都提供了对应的方法),以及一些已经写好的模板方法(模板方法会调用这些可重写的方法),使用此类只需要将可重写的方法进行重写,并调用提供的模板方法,从而实现锁功能(学习过设计模式会比较好理解一些)

总共5个

我们首先来看可重写方法:

//独占式获取同步状态,查看同步状态是否和参数一致,如果返没有问题,那么会使用CAS操作设置同步状态并返回true
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

//独占式释放同步状态
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

//共享式获取同步状态,返回值大于0表示成功,否则失败
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

//共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

//是否在独占模式下被当前线程占用(锁是否被当前线程持有)
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException,也就是说根据不同的锁类型,我们需要去实现对应的方法

ReentrantLock源码分析 (公平锁分析)

我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的

公平锁 Lock 实现
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

   // 加锁操作调用了模板方法acquire
   // 为了防止各位绕晕,请时刻记住
   // lock方法一定是在某个线程下为了加锁而调用的,并且同一时间可能会有其他线程也在调用此方法
    final void lock() {
        acquire(1);
    }

    ...
}
调用 acquire()

直接调用了AQS提供的模板方法acquire(),我们来看看它在AQS类中的实现细节:

@ReservedStackAccess 
// 这个是JEP 270添加的新注解,它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出,下同
// arg 是锁的次数
// arg == 1
public final void acquire(int arg) {
    // tryAcquire 尝试获取锁
    // 如果未成功 
    // addWaiter(Node.EXCLUSIVE)
    // 然后 acquireQueued()
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))   
        // 节点为独占模式Node.EXCLUSIVE
        selfInterrupt();
}

首先会调用tryAcquire()方法(这里是由FairSync类实现的),如果尝试加独占锁失败(返回false了)说明可能这个时候有其他线程持有了此独占锁

那么当前线程需要放入等待队列,所以会用 addWaiter(Node.EXCLUSIVE)

addWaiter()

当尝试获取锁失败了,就需要将当前线程节点放到AQS的等待队列里,返回的是当前线程节点

所以当前线程得先等着,那么会调用addWaiter()方法将线程加入等待队列中:

mode == Node.EXCLUSIVE == null 独占模式,应该就是一个 Node 节点就一个线程排队,要是共享可能有多个

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试使用CAS直接入队,如果这个时候其他线程也在入队(就是不止一个线程在同一时间争抢这把锁)就进入enq()
    Node pred = tail;
    // 如果 pred (tail) 不是 null 就表示AQS里面至少有一个等待节点
    // 那么直接把当前节点放到队尾就行了
    if (pred != null) {
        node.prev = pred;
        // 可能这时候也有其他也同时加入
        // 通过CAS保证只有一个线程可以成功
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 否则 说明此时队里是空的,或者别人先抢了 你创建失败
    // 此方法是CAS快速入队失败时调用
    // 那只能 enq 了
    enq(node);
    return node;
}

private Node enq(final Node node) {
   // 自旋形式入队,可以看到这里是一个无限循环
    for (;;) {
        Node t = tail;
        if (t == null) {  
            // 这种情况只能说明头结点和尾结点都还没初始化
            // 初始化头结点和尾结点
            // 也要通过 CAS 来保证只有一个线程来初始化就行
            // 头结点相当于 虚拟头结点,其 next 是第一个有效节点
            if (compareAndSetHead(new Node()))   
                tail = head;
        } else {
            // 说明AQS队列有值,或者你这个线程不行,CAS抢失败了
            // 那就下来再试试吧
            node.prev = t;
            // 先将自己的 prev 指向尾节点
            // 即使 后面 CAS 失败了也无所谓
            // 下次就 prev 也变了
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;   
                // 只有CAS成功的情况下,才算入队成功
                // 如果CAS失败,那说明其他线程同一时间也在入队,并且手速还比当前线程快,刚好走到CAS操作的时候,其他线程就先入队了
                // 那么这个时候node.prev就不是我们预期的节点了,而是另一个线程新入队的节点
                // 所以说得进下一次循环再来一次CAS,这种形式就是自旋
            }
        }
    }
}

在了解了addWaiter()方法会将节点加入等待队列之后,我们接着来看,addWaiter()会返回已经加入的节点

acquireQueued()

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

目的就是我已经在AQS等待队列里了,通过 acquireQueued 来不断尝试按照队列顺序去获取锁

acquireQueued()在得到返回的节点时,也会进入自旋状态,等待唤醒(也就是开始进入到拿锁的环节了):

@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 无限循环 自旋
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {   
                // 可以看到当此节点位于队首(node.prev == head)时,会再次调用tryAcquire方法获取锁
                // 即 node.prev == head 说明它 node 已经是第一个等待节点了
                // 如果获取成功,会返回此过程中是否被中断的值
                
                setHead(node);    
                // 新的头结点设置为当前结点,并且将其的 next 和 prev 清空

                p.next = null; 
                // 原有的头结点没有存在的意义了
                
                failed = false;   //没有失败
                return interrupted;   //直接返回等待过程中是否被中断
            } 
            // 依然没获取成功
            // 可能当前节点不是第一个 或者 锁还没有释放
            // 那么就要开始尝试将线程挂起

            // shouldParkAfterFailedAcquire(p, node)
            // 将当前节点的前驱节点等待状态设置为SIGNAL,如果失败将直接开启下一轮循环,直到成功为止,如果成功接着往下

            // 只有成功将前驱结点的等待状态设为 SIGNAL, 才会将当前线程挂起
            // 表示开始等待了

            // parkAndCheckInterrupt()
            // 挂起线程进入等待状态,等待被唤醒
            // 如果在等待状态下被中断,那么会返回true,直接将中断标志设为true,否则就是正常唤醒,继续自旋
            if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())   
                // 意思只有在等待状态下被中断才会进入
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);   
    // 通过unsafe类操作底层挂起线程(会直接进入阻塞状态)
    return Thread.interrupted();
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;   // 已经是SIGNAL,直接true
    if (ws > 0) {   // 不能是已经取消的节点,必须找到一个没被取消的
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;   //直接抛弃被取消的节点
    } else {
        // 不是SIGNAL,先CAS设置为SIGNAL(这里没有返回true因为CAS不一定成功,需要下一轮再判断一次)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;   // 返回false,马上开启下一轮循环
}

所以,acquire()中的if条件如果为true,那么只有一种情况,就是等待过程中被中断了,其他任何情况下都是成功获取到独占锁,所以当等待过程被中断时,会调用selfInterrupt()方法:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

这里就是直接向当前线程发送中断信号了。

上面提到了LockSupport类,它是一个工具类,我们也可以来玩一下这个parkunpark:

public static void main(String[] args) throws InterruptedException {
    Thread t = Thread.currentThread();  //先拿到主线程的Thread对象
    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("主线程可以继续运行了!");
            LockSupport.unpark(t);
           //t.interrupt();   发送中断信号也可以恢复运行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    System.out.println("主线程被挂起!");
    LockSupport.park();
    System.out.println("主线程继续运行!");
}

这里我们就把公平锁的lock()方法实现讲解完毕了

尝试获取锁方法 tryAcquire()

可重入独占锁: 如果你是第一个节点才会把锁给你,或者队列中没有等待节点

(可重入独占锁的公平实现) 接着我们来看公平锁的tryAcquire()方法:

static final class FairSync extends Sync {
    // 可重入独占锁的公平实现
    @ReservedStackAccess
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();   
        // 先获取当前线程的Thread对象
        int c = getState();     
        // 获取当前AQS对象状态(独占模式下0为未占用,大于0表示已占用)
        
        if (c == 0) {       
            // 如果是0,那就表示没有占用,现在我们的线程就要来尝试占用它

            // hasQueuedPredecessors()
            // true if there is a queued thread preceding the current thread, and false if the current thread is at the head of the queue or the queue is empty
            // 逻辑是当AQS存在队列 (tail != head) 且 (第一个结点为空 (恰好在插入第二个节点) 或者 第一个节点的线程不是当前线程) 
            // 返回true,说明你不是第一个等待的人,接着排队
            // 如果没必要排队,就说明可以直接获取锁
            // compareAndSetState(0, acquires)
            // 锁上 尝试+1 因为有可能好多线程过了 !hasQueuedPredecessors() 所以也需要 CAS
            // CAS设置状态,如果成功则说明成功拿到了这把锁,失败则说明可能这个时候其他线程在争抢,并且还比你先抢到
            if (!hasQueuedPredecessors() &&    
                compareAndSetState(0, acquires)) {   
                setExclusiveOwnerThread(current);    
                // 成功拿到锁,会将独占模式所有者线程设定为当前线程(这个方法是父类AbstractOwnableSynchronizer中的,就表示当前这把锁已经是这个线程的了)
                return true;   
                // 占用锁成功,返回true
            }
        }
        // 如果不是0,那就表示被线程占用了,这个时候看看是不是自己占用的,如果是,由于是可重入锁,可以继续加锁
        else if (current == getExclusiveOwnerThread()) {   
            int nextc = c + acquires;    
            // 多次加锁会将状态值进行增加,状态值就是加锁次数
            if (nextc < 0)   //加到int值溢出了?
                throw new Error("Maximum lock count exceeded");
            setState(nextc);   //设置为新的加锁次数
            return true;
        }

        // 其他任何情况都是加锁失败
        return false;
    }
}
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。

公平锁 unlock() 实现

加锁过程已经OK,我们接着来看,它的解锁过程,unlock()方法是在AQS中实现的:

public void unlock() {
    sync.release(1);    
    // 直接调用了AQS中的release方法,参数为1表示解锁一次state值-1
}
@ReservedStackAccess
public final boolean release(int arg) {
    // 和tryAcquire一样,也得子类去重写,释放锁操作
    if (tryRelease(arg)) {
        // 当解锁成功
        // 释放锁成功后,获取新的头结点
        // 所以加锁的时候,只是把node的thread和prev清空,但 waitStatus 还是保留
        Node h = head;

        // 根据头结点的 waitStatus 来判断第一个等待节点是否开始等待了
        if (h != null && h.waitStatus != 0)   
        //如果新的头结点不为空并且不是刚刚建立的结点(初始状态下status为默认值0,而上面在进行了shouldParkAfterFailedAcquire之后,会被设定为SIGNAL状态,值为-1)
            unparkSuccessor(h);   //唤醒头节点下一个节点中的线程
        return true;
    }
    return false;
}
tryRelease 尝试释放锁

那么我们来看看tryRelease()方法是怎么实现的,具体实现在Sync中:

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;   
    // 先计算本次解锁之后的状态值
    if (Thread.currentThread() != getExclusiveOwnerThread())   
    // 因为是独占锁,那肯定这把锁得是当前线程持有才行
        throw new IllegalMonitorStateException();   //否则直接抛异常
    
    boolean free = false;
    if (c == 0) {  
        // 如果解锁之后的值为0,表示已经完全释放此锁
        free = true;
        setExclusiveOwnerThread(null);  
        // 将独占锁持有线程设置为null
    }
    setState(c);   // 状态值设定为c
    // 如果不是0表示此锁还没完全释放,返回false,是0就返回true
    return free;
}

如果已经将锁释放了,那么就要去 AQS 找下一个等待节点线程来进行唤醒

unparkSuccessor 唤醒下一个节点
private void unparkSuccessor(Node node) {
    // 将等待状态waitStatus设置为初始值0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取下一个结点
    Node s = node.next;
    // 如果下一个结点为空或是等待状态是已取消,那肯定是不能通知unpark的
    if (s == null || s.waitStatus > 0) {   
        // 这时就要遍历所有节点再另外找一个符合unpark要求的节点了
        s = null;

        // 这里是从队尾向前
        // 因为enq()方法中的t.next = node是在CAS之后进行的
        // 而 node.prev = t 是CAS之前进行的,所以从后往前一定能够保证遍历所有节点
        for (Node t = tail; t != null && t != node; t = t.prev)   
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)   //要是找到了,就直接unpark,要是还是没找到,那就算了
        LockSupport.unpark(s.thread);
}

为什么要从队尾找

因为在加锁被放到等待队列时,会有 addWaiter 以及 enq 方法,其中需要把当前节点放到队列的末尾,但此时也有可能会有好多线程一起放入末尾,为了避免并行错误, 先将自己的 prev 指向尾节点,然后来一次 CAS 看看能不能让 尾节点变成当前节点,只有成功了,才会把之前的尾节点的 next 指向当前节点 t.next = node;

所以存在这么一种可能,在当前队列中,你刚好把一个节点变成尾节点,但是还没来得及将上一个的尾节点指向它 t.next = node;, 然后此时又刚好持有锁的线程在找唤醒节点,就有可能漏掉

node.prev = t;
// 先将自己的 prev 指向尾节点
// 即使 后面 CAS 失败了也无所谓
// 下次就 prev 也变了
if (compareAndSetTail(t, node)) {
    t.next = node;
    return t; 
}

总结

综上,我们来画一个完整的流程图:

alt text
alt text

还有非公平锁和读写锁

AQS:next 指针的不可靠性

unparkSuccessor 方法中,之所以要从后往前(tail -> prev)遍历查找下一个需要唤醒的节点,正是为了解决 addWaiter 或 enq 过程中,CAS 操作与 next 指针赋值之间的时间差问题

enq(Node node) 方法中,新节点入队的代码如下:

node.prev = t;           // 1. 第一步:设置 prev
if (compareAndSetTail(t, node)) { // 2. 第二步:CAS 设置尾节点
    t.next = node;       // 3. 第三步:设置 next
    return t;
}

这三步不是原子的。在多线程环境下,会出现以下极端的“断裂”瞬间:

节点已关联 prev:新节点 node 已经把自己的 prev 指向了老尾部 t。

CAS 成功:tail 已经更新为新节点 node 了。

还没来得及执行 t.next = node:就在这一刻,持有锁的线程释放了锁,进入 unparkSuccessor

如果一个节点刚好被取消,同时又有新节点入队,next 指针的链路可能是不完整的、或者是瞬时错乱的。而 prev 链在 AQS 中具有更强的保证(因为 cancelAcquire 也会处理 prev 指针的重新连接)

  • next 指针:是一种优化,大部分情况下它是好的,但不保证实时准确。

  • prev 指针 + CAS:是强一致性的保证。只要节点进了队列(成为了 tail),它的 prev 就一定是稳固的

上次编辑于: