• 作者:鐡道
  • 分类: java

并发编程AQS源码分析

AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。它是一个Java提高的底层同步工具类,比如CountDownLatch、ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的

简单来说:是用一个int类型的变量表示同步状态,并提供了一系列的CAS操作来管理这个同步状态对象
一个是 state(用于计数器,为0时释放锁)
一个是线程标记(当前线程是谁加锁的),
一个是阻塞队列Node(用于存放其他未拿到锁的线程)

例子:线程A调用了lock()方法,通过CAS将state赋值为1,然后将该锁标记为线程A加锁。如果线程A还未释放锁时,线程B来请求,会查询锁标记的状态,因为当前的锁标记为 线程A,线程B未能匹配上,所以线程B会加入阻塞队列,直到线程A触发了 unlock() 方法,这时线程B才有机会去拿到锁,但是不一定肯定拿到

源码分析阶段

直接看ReentrantLock中的lock方法加锁是如何使用到AQS的、ReentrantLock分为公平锁和非公平锁、默认是非公平锁

公平锁:按照队列先进先出的顺序进行加锁解锁

非公平锁:哪个线程先抢到锁就是哪个的、有可能造成某一个线程一直抢不到锁

非公平锁

// 非公平锁
final void lock() {
    // 进行cas操作、state值为0则赋值为1、成功获取锁
    if (compareAndSetState(0, 1))
        // 设置线程标记、线程标记用来检测是否是重入锁
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 调用AQS中的acquire方法、在调用ReentrantLock类下定义的Sync类中的nonfairTryAcquire方法进行具体的锁操作细节
        // 传参1、里面一直进行for循环CAS赋值、直到哪个线程抢到返回
        acquire(1);
}

// acquire方法是操作state状态位的方法、通过tryAcquire里面的CAS原子操作检测锁状态
public final void acquire(int arg) {
    // tryAcquire调用nonfairTryAcquire方法
    if (!tryAcquire(arg) &&
        // addWaiter: 根据给定模式创建一个当前线程的Node节点并返回、当前是创建一个独占锁的Node节点
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 中断当前线程
        selfInterrupt();
}

final boolean nonfairTryAcquire(int acquires) {
    // 先获取当前线程标识符、用于检测当前对象的线程标识符是否是同一个、同一个则为可重入锁、可直接返回、不是则返回后调用acquireQueued
    final Thread current = Thread.currentThread();
    // 获取计数器、表明当前线程重入了多少次锁
    int c = getState();
    // 为0则代表当前线程的锁全部解锁完毕
    if (c == 0) {
        // 给state计数器加1、代表上锁
        if (compareAndSetState(0, acquires)) {
            // 设置对象的线程标识符为当前线程
            setExclusiveOwnerThread(current);
            // 直接返回
            return true;
        }
    }
    // 代表当前有线程在使用该对象锁、检测是否是同一线程、是则进入、为可重入锁
    else if (current == getExclusiveOwnerThread()) {
        // 计算state
        int nextc = c + acquires;
        // 小于0则代表锁过多、即0x7fffffff + 1为负数
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 赋值state
        setState(nextc);
        // 直接返回
        return true;
    }
    // 返回false、表示当前对象锁被其他线程占用了、需要等到加锁的线程解锁才进行抢占
    return false;
}

//  根据给定模式创建一个当前线程的Node节点并返回
private Node addWaiter(Node mode) {
    // 根据当前线程创建一个Node节点、并传入模式(独占锁、共享锁)
    Node node = new Node(Thread.currentThread(), mode);
    // 获取队列尾部Node节点
    Node pred = tail;
    // 检测队列是否存在尾部节点、即是否存在节点
    if (pred != null) {
        // 新创建的Node节点上一个节点设为队列的尾部节点、然后下面直接将新创建节点插入队列尾部
        node.prev = pred;
        // 通过cas操作更换节点顺序、即新创建的节点为尾部、原先尾节点的下一个节点设置为当前新创建的Node节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 将新创建的Node节点插入队列尾部
    enq(node);
    return node;
}

// 里面for死循环无限调用nonfairTryAcquire方法检测锁是否被释放、然后进行抢占加锁
final boolean acquireQueued(final Node node, int arg) {
    // 错误位、当finally执行时该变量为true、则代表有异常发生、取消当前的操作
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 返回Node节点的上一个节点
            final Node p = node.predecessor();
            // 检测是否是头节点、是的话在进行cas检测锁是否解锁在抢占
            if (p == head && tryAcquire(arg)) {
                // 抢占到锁了、设置当前线程的Node节点为等待队列的头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 检测当前线程是否调用了interrupt中断线程方法、并且检测状态位进行阻塞、调用LockSupport.park(this)方式阻塞自己
                parkAndCheckInterrupt())
                // 如果循环走到这里在return返回的话、代表线程将中断
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
转载自: https://www.cnblogs.com/tie-dao/p/16667138.html