JUC框架 ReentrantReadWriteLock源码解析 JDK8

news/2024/5/20 8:54:24 标签: java, 读写锁, JUC, AQS, 多线程

文章目录

  • 前言
  • 重要成员
    • 内部类关系
    • 构造器
    • Sync的成员
      • 同步器状态的划分
      • 读锁计数部分
  • 写锁的获取和释放
    • 写锁的获取
    • 写锁的释放
  • 读锁的获取和释放
    • 读锁的获取
    • 读锁的释放
  • 锁降级
  • 总结

前言

ReentrantReadWriteLock是我阅读了AQS源码以来最感兴趣的类,因为它不像别的JUC构件只使用独占锁或是共享锁部分,它二者都同时使用了。它主要针对于对共享资源的访问,这些访问往往读写操作都有,但如果当前只有读操作的话,那么允许多个线程同时读来提高效率;如果当前有写操作的话,其他的读操作就得乖乖排队了。

JUC框架 系列文章目录

重要成员

内部类关系

首先要明确ReentrantReadWriteLock中定义的内部类的关系。

java">public class ReentrantReadWriteLock implements ReadWriteLock {
    /** 读锁 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 写锁 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 持有的AQS子类对象 */
    final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implements Lock {}

    public static class WriteLock implements Lock {}
}

从ReentrantReadWriteLock持有的成员变量来看,ReentrantReadWriteLock与这些内部类对象都是1对1的关系。

构造器

java">    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public static class ReadLock implements Lock {
    	private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

    public static class WriteLock implements Lock {
    	private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

ReentrantReadWriteLock的构造器默认使用非公平锁。在ReentrantReadWriteLock的构造器中又会去构造ReadLockWriteLock,从这二者的构造器中可见,它持有的AQS对象是同一个,也就是ReentrantReadWriteLockAQS成员。重点在于,ReadLockWriteLock使用的同一个AQS对象,使得可以读写互斥。

Sync的成员

同步器状态的划分

java">    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    }

由于写锁和读锁都可以重入,而且读锁还可以被多个线程所持有,但现在AQS的state只是一个int型的变量,所以把state的高16bit作为读锁的计数范围,低16bit作为写锁的计数范围,现在它们各有 0 ∼ 2 16 − 1 0 \sim 2^{16}-1 02161的计数范围了,皆大欢喜。

读锁计数部分

回想一下Semaphore对共享锁的操作,获取共享锁时Semaphore不会去记录是哪个线程拿到了共享锁,释放共享锁时不管是哪个阿猫阿狗都可以来释放共享锁。

给Semaphore打个比喻就是,Semaphore就像是一个装有令牌(permit)的黑箱子,拿到令牌的人才能去做爱做的事情,谁都可以从里面拿走若干令牌,谁都可以把新的令牌扔到里面去,但Semaphore从来不记载谁拿走的令牌。
在这里插入图片描述
但ReentrantReadWriteLock的读锁则不一样,ReentrantReadWriteLock的读锁就像是共享充电宝,各个槽位里的充电宝可以同时被多人使用(读锁同时被多个线程持有),但每个人拿走了充电宝肯定会被手机记录下来(通过ThreadLocal线程私有的HoldCounter对象,手机也是每个人私有),当然一个人也可以拿走多个充电宝(HoldCounter对象的count成员)。

在还充电宝的时候也不能乱还,你从别的机器上来借来的充电宝,或你根本没有充电宝,是不能够还充电宝的(防止没有持有读锁的线程,来释放读锁。当然也是通过线程私有的HoldCounter对象来查看线程是否持有读锁)。只有当机器上原有的充电宝全部都塞回了槽位时,充电宝才能算还完了(所有线程都释放了ReadWriteLock的读锁)。

共享充电宝机器的槽位是固定的(ReadWriteLock的总读锁数量只要不溢出 2 16 − 1 2^{16}-1 2161,就能被获取到)。

所以现在AQS需要记录当前读锁总共被拿走了多少,这个是通过AQS的state的高16bit记录。但还需要分别记录各个线程分别拿走了多少读锁,即重入的读锁次数。

java">        static final class HoldCounter {
            int count = 0;
            // 使用线程id,而不是线程的引用。这样可以防止垃圾不被回收
            final long tid = getThreadId(Thread.currentThread());
        }

        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        private transient ThreadLocalHoldCounter readHolds;
        private transient HoldCounter cachedHoldCounter;
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

首先我们要明确一个观点,cachedHoldCounter / firstReader / firstReaderHoldCount即使不存在,ReentrantReadWriteLock也是能正常工作的。所以,我们先讲讲HoldCounterThreadLocalHoldCounter的工作原理。

现在需要记录各个线程分别拿走了多少读锁,我们把记录工作交给各个线程自己,通过ThreadLocal让每个线程拥有一个线程私有的HoldCounter对象。如果当前线程没有持有读锁,这个HoldCounter对象为null(因为对ThreadLocal没有使用过get/set);如果当前线程持有着读锁,这个HoldCounter对象不为null,且count成员肯定大于等于1。
PS:在读ReentrantReadWriteLock的源码之前,我的猜想是,通过在AQS子类里增加一个map,key类型为Thread,value类型为int,这样来记录各个线程分别拿走了多少读锁。还是Doug Lea大佬厉害,因为利用ThreadLocal的话,完全不用考虑多线程竞争了。

cachedHoldCounter / firstReader / firstReaderHoldCount存在的理由,仅仅是为了获得当前线程的HoldCounter对象的一次快速尝试,如果快速尝试失败了,才需要通过ThreadLocal来获得当前线程的HoldCounter对象。

  • 在读锁计数从1变成0的这段时间内(期间1可能增长为n),firstReader只可能为两种值,历史上(从1变成0的这段时间)第一个获取读锁成功的线程,或null(当firstReader释放干净读锁了)。
  • 从1变成0的这段时间内,如果firstReader这个线程释放干净读锁后,又重新获取到读锁,firstReader成员也会继续维持null。因为赋值firstReader的时机是当读锁计数从0到1时,才可以去做。
  • 你可以简单的把firstReader / firstReaderHoldCount合起来当作一个HoldCounter
  • cachedHoldCounter一般情况下,这个引用总是指向某个持有读锁的线程的HoldCounter对象。但cachedHoldCounter当好是当前线程的HoldCounter对象这种事情,则完全看缘分(后面会讲到)。

总之,作者认为通过ThreadLocal来获得当前线程的HoldCounter对象可能耗时,所以留着两个快速尝试的后门。这个过程一般是:先看firstReader是否为当线程,再看cachedHoldCounter是否刚好缓存了当前线程的HoldCounter对象,最后实在不行,才通过ThreadLocal来获得当前线程的HoldCounter对象。

另外,这几个成员都不是volatile的原因在读锁的获取章节对tryAcquireShared函数的讲解会说。

写锁的获取和释放

写锁的获取

WriteLock方法调用的AQS方法是否阻塞是否响应中断是否超时机制返回值及含义
lock()sync.acquire(1)--void
lockInterruptibly()sync.acquireInterruptibly(1)-void
tryLock(long timeout, TimeUnit unit)sync.tryAcquireNanos(1, unit.toNanos(timeout))boolean
返回时是否获得了锁
tryLock()sync.tryWriteLock()-boolean
返回时是否获得了锁

写锁的获取总共就这4种方法,只有前3种AQS方法才是有阻塞等待的过程的方法,它们整体过程已经在独占锁的获取过程中讲解过了,所以接下来我们只需要关心AQS子类对tryAcquiretryRelease的重写实现即可。

最后一个方法只是AQS子类的新加方法,它没有阻塞等待的过程(即没有自旋+park),作用只是进行一次性的尝试。

java">    public static class WriteLock implements Lock {
    	private final Sync sync;
    	
        public void lock() {
            sync.acquire(1);
        }
		
		...
    }

从上面的sync.acquire(1)出发,会调用到子类的tryAcquire实现。在此之前,回顾一下tryAcquire返回值的含义,若返回true代表获取独占锁成功,若返回false代表获取独占锁失败。

java">        protected final boolean tryAcquire(int acquires) {
            //获得当前线程 
            Thread current = Thread.currentThread();
            //获得同步器状态
            int c = getState();
            //获得写锁计数
            int w = exclusiveCount(c);
            //如果c不为0,说明有锁,但不知道是什么锁
            if (c != 0) {
                // 进入分支有两种情况:
                // 1.写锁计数为0。说明此时只有读锁,不能将读锁升级为写锁,所以直接返回false
                // 2.写锁计数不为0,但不是当前线程持有的写锁。直接返回false。
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 加上参数的写锁计数,如果溢出了,就抛出异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 执行到这里,说明肯定是当前线程持有的写锁,那么此时没有线程竞争,
                // 直接set新的写锁计数
                setState(c + acquires);
                return true;
            }
            // 执行到这里,肯定是c == 0,当前既没有读锁,也没有写锁。
            // 但可能有多个线程来竞争这个状态下的任何锁,所以接下来需要通过CAS来竞争
            
            if (writerShouldBlock() ||// 此函数对公平非公平进行了封装,返回false代表在当前公平模式判断下,接下来可以尝试获得锁
                !compareAndSetState(c, c + acquires))// 如果CAS成功,则不会进入此分支
                return false;

			//执行到这里,说明该函数开始检测到 没有任何锁,然后当前线程还获得到了写锁
            setExclusiveOwnerThread(current);
            return true;
        }

具体细节请看注释。我们知道写锁和写锁肯定互斥,写锁也和读锁互斥,所以上面直接返回false的情况挺多的,所以我们不如先说一下返回true的情况(按照程序中的顺序):

  • 之前是由当前线程持有的写锁,所以当前线程现在重入这个写锁。
  • 当前ReentrantReadWriteLock没有任何锁被持有,并且当前线程竞争到了写锁。

直接返回false的情况(按照程序中的顺序):

  • 当前只有读锁,不能将读锁升级为写锁。
  • 当前有写锁,但写锁的持有者不是当前线程。
  • 当前没有任何锁,但判断公平模式后发现当前线程排在了其他线程后面。
  • 当前没有任何锁,判断公平模式后发现可以直接尝试,但CAS竞争失败了。

在两处CAS操作处,设置的新值是c + acquires而不是c + exclusiveCount(acquires),这是因为传入的acquires参数只能是1,高16bit不可能有值。虽然我还是觉得后者更加严谨。

writerShouldBlock这个函数封装掉了 当前是公平还是非公平 的信息,我们只需要知道该函数返回了false,接下来就可以尝试获得写锁;返回了true,接下来不能去尝试获得写锁,且即将进入阻塞状态(详见AQS#acquire)。
而返回false有两种可能性:

  • 该锁的实现允许插队(即非公平实现)。
  • 当前线程排在了队伍的最前面(即公平实现,但此时同步队列中没有等待的线程)。

接下来看一下AQS子类的新加方法tryWriteLock,非公平的、一次性的获取写锁的方法实现:

java">        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

发现它和上面的tryAcquire方法实现几乎一样,除了:

  • 该函数没有参数,只会让写锁计数加1。
  • CAS操作前,没有判断writerShouldBlock。这就是非公平的体现。
  • 判断溢出变得简单,因为只是加1,所以旧值如果刚好等于最大值,那么再加1肯定溢出。
  • 即使是重入写锁(没有线程竞争),也是使用CAS操作增加写锁计数。

写锁的释放

写锁的释放就没那么复杂了,具体的释放流程请看 独占锁的释放过程,我们只关注AQS子类实现就好。

java">    public static class WriteLock implements Lock {
    	private final Sync sync;
    	
        public void unlock() {
            sync.release(1);
        }
    }
java">        protected final boolean tryRelease(int releases) {
        	//要释放写锁,首先得保证当前线程已经持有了写锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //计算出同步状态的新值
            int nextc = getState() - releases;
            //如果新值的写锁的重入次数为0,那么写锁将被释放
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
            	//如果写锁将完全释放,那么设置ExclusiveOwnerThread成员为null
                setExclusiveOwnerThread(null);
            //不管新值是多少,设置它为state
            setState(nextc);
            return free;
        }

该函数减少相应的写锁计数,只有当新的写锁计数为0时,该函数才会返回true。

读锁的获取和释放

读锁的获取

ReadLock方法调用的AQS方法是否阻塞是否响应中断是否超时机制返回值及含义
lock()sync.acquireShared(1)--void
lockInterruptibly()sync.acquireSharedInterruptibly(1)-void
tryLock(long timeout, TimeUnit unit)sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))boolean
返回时是否获得了锁
tryLock()sync.tryReadLock()-boolean
返回时是否获得了锁

读锁的几种获取方式完全类似于写锁,只有前3种AQS方法才是有阻塞等待的过程的方法,它们整体过程已经在共享锁的获取过程中讲解过了,所以接下来我们只需要关心AQS子类对tryAcquireSharedtryReleaseShared的重写实现即可。

java">    public static class ReadLock implements Lock {
    	private final Sync sync;
    	
        public void lock() {
            sync.acquireShared(1);
        }
        
		...
    }

从上面的sync.acquireShared(1)出发,会调用到子类的tryAcquireShared实现。在此之前,回顾一下tryAcquireShared返回值的含义:

  • 如果返回值大于0,说明获取共享锁成功,并且后续获取也可能获取成功。
  • 如果返回值等于0,说明获取共享锁成功,但后续获取可能不会成功。
  • 如果返回值小于0,说明获取共享锁失败。
java">        protected final int tryAcquireShared(int unused) {
        //首先注意这个参数没有使用的,这个函数目的只是想让读锁计数加1
            Thread current = Thread.currentThread();
            int c = getState();
            //如果当前有写锁被持有,且不是当前线程持有的,则返回-1代表获取失败。
            //因为读写互斥,不能把写锁降级为读锁
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
			//执行到这里,可能1. 当前没有写锁 2. 有写锁但它是被当前线程持有的

			//获得读锁计数
            int r = sharedCount(c);
            if (!readerShouldBlock() &&//判断当前公平模式下,当前线程是否可以尝试获得锁
                r < MAX_COUNT &&//如果读锁计数小于最大读锁计数,那说明至少还允许加1
                compareAndSetState(c, c + SHARED_UNIT)) {//CAS尝试加1单位的读锁计数
                //执行到这里说明 成功将读锁计数加1了,之后的逻辑都属于是善后操作

				//如果获得读锁之前,读锁计数为0,那么说明当前线程是第一个获取到读锁的线程
                if (r == 0) {
                    firstReader = current;//当前线程设置为AQS的firstReader成员
                    firstReaderHoldCount = 1;//之前为0,现在肯定应该为1了
                //当前线程重入了读锁,那么加1就好
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                //如果当前线程是第二个或以后的线程,来获得的读锁
                } else {
                	//获得的HoldCounter指不定是哪个线程的HoldCounter
                    HoldCounter rh = cachedHoldCounter;
                    //如果为null说明从来没有设置过AQS的cachedHoldCounter
                    //如果不为null但线程id不是当前的,说明重新设置
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    //如果cachedHoldCounter的线程id就是当前线程id,且count为0
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //不管怎样,局部变量rh的count都要加1
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
  • 从最开始return -1的地方可知,获取读锁会因为当前写锁不是当前线程所持有而直接返回-1。但获取读锁允许写锁是当前线程所持有而继续尝试获得。
  • 在CAS操作compareAndSetState(c, c + SHARED_UNIT)执行成功后,说明当前线程获取共享锁成功,但还需要做一系列的善后操作。
java">private transient ThreadLocalHoldCounter readHolds;//ThreadLocal对象

//设置的成员
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

观察这一系列的善后操作,发现设置的成员都是非volatile的,设置的时候也没有使用CAS操作,难道这些地方不需要多线程竞争的保护吗,我们看了这么多JUC的源码,从来没看过这么诡异的代码。接下来我们来分析原因:

进入if (r == 0)分支,说明获得读锁之前,读锁计数为0,那么说明当前线程是第一个获取到读锁的线程,当前线程将读锁计数从0变成了1。这个分支内的代码不存在多线程竞争,所以不需要什么保护。分析如下:

  • 进入这个分支只可能是,第一个获取到读锁的线程。
  • 既然只有一个线程能进入此分支,所以不存在多线程竞争。

进入else if (firstReader == current)分支,说明说明获得读锁之前,读锁计数不为0,但当前线程还是第一个获取到读锁的线程,只不过当前线程将读锁计数从n变成了n+1。这个分支内的代码也不存在多线程竞争,所以不需要什么保护。分析如下:

  • ReentrantReadWriteLock的读锁计数从1变回0的期间,firstReader要么就是这期间第一个获取到读锁的线程,要么为null,不可能是别的值。
    • 如果firstReader为null,说明第一个获取到读锁的线程,已经完全释放了读锁。
  • firstReader == current说明进入此分支,只可能是第一个获取到读锁的线程,且firstReader的有效的值只能为一个线程,所以不存在多线程竞争。

进入else if (firstReader == current)的else分支,当前线程是历史上(读锁计数从1变回0的期间)第二个或以后的线程,来获得读锁。这个分支内的代码存在多线程竞争的情况,即有可能多个线程都在执行这个分支内的代码,虽然如此,也不需要进行什么保护。分析如下:

这段代码执行起来真的很“乱”:

  1. 首先,多个线程同时执行这段代码,它们的语句执行顺序完全没有保证,即完全有可能这些语句都是交叉执行的。主要是对cachedHoldCounter的赋值,哪个线程都可能先执行。
  2. 其次,cachedHoldCounter不是volatile的,就算线程对它进行了赋值,其他线程也可能不能马上看到。
  3. 综上,我们读取到cachedHoldCounter到底是哪个线程的HoldCounter真的是完全随缘的,注释也说了这个cachedHoldCounter是heuristic启发式的。

This is non-volatile since it is just used as a heuristic, and would be great for threads to cache.

翻译过来就是,cachedHoldCounter之所以不是volatile的,是因为它是启发式的,你要是通过cachedHoldCounter获取到的HoldCounter对象刚好是当前线程的HoldCounter对象,那算你运气好。

有了以上认知,我们本着cachedHoldCounter随缘的原则,再来看分析else if (firstReader == current)的else分支。

  • if (rh == null || rh.tid != getThreadId(current))分支进入有两个原因。
    • 如果因为rh == null(可能只是因为可见性,导致看到的是null),说明AQS还没有设置过cachedHoldCounter,那么从当前线程通过ThreadLocal获取到HoldCounter对象(readHolds.get()),再将其赋值出去(cachedHoldCounter = rh = readHolds.get())。
    • 如果因为rh != nullrh的线程id不是当前线程id,说明当前线程是第3个或以后来获取共享锁并设置rh的线程。(第一个线程是赋值给current,第二个线程赋值给cachedHoldCounter导致cachedHoldCounter不为null)
  • 进入else if (rh.count == 0)分支,说明cachedHoldCounter就是当前线程私有的那个HoldCounter对象,那就最起码不用去readHolds.get()了。但进入此分支说明rh.count == 0,此时当前线程私有的HoldCounter对象为null,但AQScachedHoldCounter还保留着设置为null之前的那个对象,所以直接把这个HoldCounter对象设置给当前线程的ThreadLocal就好。
    • 这种情况需要结合读锁的释放过程理解,从之后的讲解可知,线程完全释放读锁时,只会清空ThreadLocal的HoldCounter对象,而不会去在意AQScachedHoldCounter成员是否是当前线程的HoldCounter对象。
  • 这段“不靠谱”的代码,最靠谱的竟然是rh局部变量,在执行到rh.count++时,能保证rh肯定是当前线程的线程私有(ThreadLocal的)的那个HoldCounter对象。

tryAcquireShared的整个逻辑都可以称之为一次fastPath快速尝试,从if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT))的逻辑来看,这次快速尝试很容易就会失败的,如果tryAcquireShared失败了,就需要调用fullTryAcquireShared来一次完全的尝试

java">final int fullTryAcquireShared(Thread current) {
	//只要rh不为null,那么它一定指向当前线程的HoldCounter对象
    HoldCounter rh = null;
    for (;;) {
        int c = getState();

		/*第一部分*/
        if (exclusiveCount(c) != 0) {//如果写锁被持有
            if (getExclusiveOwnerThread() != current)//如果写锁不是当前线程持有
                return -1;
            /*else {
            	如果写锁就是当前线程持有的,我们啥也不干,直接执行下一段代码
            }*/
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
            
		//如果写锁没有被持有,且当前线程排在其他线程后面
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            // 虽然readerShouldBlock返回了true,但如果当前线程已经获得了读锁,从语义上来说,
            // 当前线程是可以继续重入的,这也不属于插队的行为。
            // 反之,如果当前线程没有持有着读锁,说明此时再去尝试获得读锁就真的是插队的行为了,
            // 所以,如果发现是这种情况,则直接返回-1,让当前线程去走阻塞等待的流程。
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                // 进入这种情况,能保证当前线程当前是持有着读锁的,因为current肯定不为null,
                // 所以firstReader现在也不为null,它不为null,firstReaderHoldCount肯定也不为0
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();//当前线程如果没有获得读锁,get到的肯定是0的count
                        if (rh.count == 0)
                            readHolds.remove();//当前线程没有获得读锁时,本来它的HoldCounter成员本来就应该为null,所以要remove
                    }
                }
                //rh局部变量还保留着当前线程的HoldCounter成员的引用
                if (rh.count == 0)
                    return -1;
            }
        }

		/*第二部分*/
		//执行到这里说明,当前线程接下来可以尝试获得读锁

        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

从整个逻辑来看,fullTryAcquireShared是利用自旋来不停尝试获得读锁,直到成功为止,这就是为什么称它为完全的尝试。不过函数退出不止会因为成功获得锁而退出(返回1),也会因为当前线程不符合继续获得锁的条件而退出(返回-1)。

我们把自旋的逻辑分为两个部分:

  • 第一部分负责判断当前线程符不符合继续获得锁的条件,如果不符合则返回-1退出自旋;如果符合,则继续执行第二部分。
  • 第二部分负责CAS修改同步器的状态,如果修改成功,则继续完成善后操作;如果修改失败,继续下一次循环。

看第一部分的代码,我们要把重点放在分支的末端上,并分析各个末端的处理:

  • if (exclusiveCount(c) != 0)分支进入,说明写锁被持有着呢。
    • if (getExclusiveOwnerThread() != current)分支进入,说明写锁不是当前线程持有。当前线程不符合继续获得锁的条件,退出。
    • if (getExclusiveOwnerThread() != current)的else分支进入(实际上没有这个分支,我用注释标注出来了),说明写锁是当前线程持有。那么直接执行第二部分代码,尝试获得读锁。这说明只要当前线程持有了写锁,那么不管sync queue中有哪些节点,当前线程都可以继续获得读锁。

else we hold the exclusive lock; blocking here would cause deadlock.

关于上一条分析,注释中有解释。换句话说,如果if (getExclusiveOwnerThread() != current)的else分支进入后,也返回了-1,就会造成死锁。这是因为tryAcquireShared返回值的含义在整个共享锁获取过程中起到的作用,如果tryAcquireShared返回了-1,说明获取共享锁失败,当前线程即将进入阻塞状态,但当前线程已经获得了写锁了,它要是阻塞了,谁来唤醒它呢。所以,这里不能返回-1。

  • else if (readerShouldBlock())分支进入,说明写锁没有被持有,且当前线程排在其他线程后面,即sync queue中至少有一个head后继。
  • readerShouldBlock()这里还返回了true,说明在当前公平模式下,当前线程排在了别的线程后面,但有的奇怪的是,这种情况难道不应该直接返回-1吗?
    • 之所以还有后继的判断,是因为需要区别当前线程是重入读锁的这种情况。如果是当前线程重入读锁(当前线程之前已经获得过至少一次读锁了),那么不需要理会排队信息,直接去执行第二部分代码即可;如果是当前线程第一次想获取读锁,那么则需要理会排队信息,即直接返回-1退出自旋,然后乖乖排队去。
    • 当前线程重入读锁,不需要理会排队信息。这个可以打个比方,比如你去餐厅吃饭,吃到一半你到餐厅外面上个厕所,等你回来的时候,门口排着长队,你想进去继续吃饭,但服务员不让你进去,因为人人都需要排队。这个时候,你告诉服务员,我之前在里面已经吃了一半(线程之前获得过读锁),但现在我想重新进去(线程重入了读锁)。服务员知道了也就不拦你,让你进去了。
    • if (firstReader == current)分支进入,说明firstReader不为null,从读锁的释放过程来看,只要firstReader不为null,那么firstReaderHoldCount肯定大于0。既然大于0,说明当前线程是在重入读锁,所以给当前线程放行,继续执行第二部分。
    • if (firstReader == current)的else分支进入,说明当前线程不是firstReader,看来没法通过方便的firstReader来判断,只能依靠其他东西。
      • 如果rh为null,获取到当前线程的HoldCounter对象作为赋值给rh。从整个函数逻辑来看,局部变量rh只要不为null,就肯定是当前线程的HoldCounter对象。整个获取的手法,和tryAcquireShared中的手法类似。重点在于,只要执行到if (rh.count == 0)(指第一条)时,rh就已经是当前线程的ThreadLocal的HoldCounter对象了。
      • 这里需要分两种情况(重入读锁、第一次获取读锁),如果是第一次获取读锁这种情况,那么执行readHolds.get()之前,当前线程是没有HoldCounter对象的(这一点可以从读锁的释放过程得知)。所以readHolds.get()得到的肯定是一个初始的HoldCounter对象,count肯定为0,发现是这种情况,则需要及时清空当前线程的HoldCounter对象(readHolds.remove()),以维持“没有持有读锁时,线程肯定没有ThreadLocal的HoldCounter对象”的规则。接下来第二个if (rh.count == 0)判断会成立就会直接退出循环了。
      • 如果是重入读锁这种情况,那么执行readHolds.get()之前,当前线程是拥有HoldCounter对象的,且count肯定是大于0的。接下来第二个if (rh.count == 0)判断,也不会进入。所以会顺利执行到第二部分。

Make sure we’re not acquiring read lock reentrantly

到这里,终于分析完毕了第一部分的代码。总之,第一部分的else if (readerShouldBlock())分支总结起来就是上面这句话,避免在重入读锁的时候直接返回-1,只有在线程第一次获得读锁时才可以返回-1。

接下来看看第二部分的代码,这部分其实和tryAcquireShared里成功CAS修改state的善后操作一样,语句略有不同,但实质完全一样,所以就不赘述了。

差点忘了还有个tryReadLock,我们最后看看这个函数实现:

java">        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                	//之后都是善后操作
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

逻辑很简单,不停自旋,直到成功获得读锁(返回true),或者写锁被别人持有(返回false)。

读锁的释放

读锁的释放就没那么复杂了,具体的释放流程请看共享锁的获取与释放,我们只关注AQS子类实现就好。

java">    public static class ReadLock implements Lock {
    	private final Sync sync;
    	
        public void unlock() {
            sync.releaseShared(1);
        }
    }
java">        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // 进入这个分支,能保证firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)//如果将从1变成0,那么只清空firstReader
                    firstReader = null;
                else//如果当前大于1,那么减小firstReaderHoldCount
                    firstReaderHoldCount--;
            } else {
            	//获取当前线程的HoldCounter的老套路
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                //执行到这里,rh局部变量已经是当前线程的HoldCounter了
                int count = rh.count;
                if (count <= 1) {//如果count为0,说明当前线程没有持有读锁中,HoldCounter是get()新生成的
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                //执行到这里,说明当前线程持有读锁中,那么减小读锁计数1
                --rh.count;
            }
            //此时读锁计数已成功减1,但同步状态却还没修改
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
					//只有在读写锁都是干净的情况,才返回true
                    return nextc == 0;
            }
        }
  • 如果firstReader为null,说明历史上第一个reader已经完全释放干净读锁了。反之,无法通过firstReaderHoldCount == 1推导出firstReader不为null。
  • 返回的是nextc == 0,只有在读写锁都是干净的情况,才返回true。这里有点疑问是,在以前讲解的共享锁的释放过程中,是一定要让tryReleaseShared返回true以便接下来调用doReleaseShared来唤醒后面的共享锁节点,难道当前线程释放读锁后,因为别的线程还持有着读锁,所以还是得返回false?
    • 之所以这么做,是因为读锁在获取过程中由于读读不互斥所以基本不会阻塞等待(指当前写锁没有被其他线程持有的情况),而且就算同步队列中有连续的几个共享锁,唤醒后面共享锁节点的任务都在 共享锁获取成功时就做掉了。所以读锁释放成功时,一般不需要返回true。
    • 而返回的true的情况是,释放读锁后,当前读写锁都是干净的,这个时候来唤醒写锁节点才合适。因为写 和读写 都是互斥的。
    • 综上,tryReleaseShared返回true的原因是,为了唤醒写锁节点,在当前读写锁都没被持有的情况下。

锁降级

从本文的分析来看,一个线程持有写锁后,可以继续去持有读锁,如果在这之后,这个线程释放了写锁,那么就称写锁现在降级为了读锁。

上面这个过程,细说的话,应该分为两个部分:

  1. 一个线程持有写锁后,继续去持有读锁——锁的重入。
  2. 同时持有读写锁后,先释放了写锁——锁降级。

在上面fullTryAcquireShared的讲解中,解释了“一个线程持有写锁后,可以继续去持有读锁”的必要性,如果不允许继续去持有读锁,转而进入阻塞等待的过程,会造成死锁的。

如果一个线程持有了读锁,不能继续去持有写锁,从而锁升级。因为可能当前不止有一个线程都持有了读锁,你再去获得写锁是不合理的。

总结

  • 同步器的state被划分为两个部分,分别记录被拿走的读锁和写锁的总数。
  • 分别记录各个线程拿走的读锁的工作交给了各个线程自己,通过ThreadLocal实现。
  • 不仅写锁可以重入(这类似于ReentrantLock),读锁也可以重入。
  • 尝试获取写锁时,会因为其他写锁或任意读锁(包括自己)的存在,而进入阻塞等待的过程,抛入sync queue中去。
  • 尝试获取读锁时,会因为其他写锁(不包括自己的写锁)的存在,而进入阻塞等待的过程,抛入sync queue中去。
  • 读锁的非公平获取中,apparentlyFirstQueuedIsExclusive 一定概率防止了写锁无限等待。
  • 锁降级是指,一个线程同时持有读锁和写锁后,先释放了写锁,使得写锁降级为了读锁。

http://www.niftyadmin.cn/n/768191.html

相关文章

JUC集合类 CopyOnWriteArrayList源码解析 JDK8

文章目录前言核心成员常用方法getsetaddremoveremove(int index)remove(Object o)index > lenindex < lenfindIndex代码块之后cleartoArray迭代器总结前言 CopyOnWriteArrayList 是一种写时复制的ArrayList&#xff0c;它将读操作和写操作的情形区分开来&#xff0c;并在…

JUC集合类 CopyOnWriteArraySet源码解析 JDK8

文章目录前言与CopyOnWriteArrayList不同之处addIfAbsentaddAllAbsent总结前言 类似于上一篇讲的CopyOnWriteArrayList&#xff0c;CopyOnWriteArraySet可以认为是一个写时复制的HashSet。 但CopyOnWriteArraySet的底层实现完全依赖了CopyOnWriteArrayList&#xff0c;它持有…

JUC集合类 ConcurrentSkipListMap源码解析 JDK8

文章目录前言源码注释术语节点定义构造器get 查找操作返回情况findPredecessorput 插入操作假设新建层数没有超过最大层数假设新建层数超过了最大层数返回情况remove 删除操作findNodetryReduceLevel返回情况marker存在的必要性如果marker不存在marker存在时总结前言 Concurre…

ThreadLocalRandom#getProbe #advanceProbe浅析

前言 Briefly, a thread’s “probe” value is a non-zero hash code that (probably) does not collide with other existing threads with respect to any power of two collision space. When it does collide, it is pseudo-randomly adjusted (using a Marsaglia XorShif…

JUC集合类 ConcurrentHashMap源码解析 JDK8

文章目录前言常量成员节点类构造器put 插入操作加锁情况红黑树的binCount固定为2返回情况spreadinitTablehelpTransferresizeStampsizeCtl的低16bit退出循环的条件treeifyBintryPresizeaddCount计数部分计数部分结束时扩容部分CAS失败影响扩容fullAddCountwasUncontended的作用…

JDK8 ConcurrentHashMap的Bug集锦

前言 JDK8的ConcurrentHashMap并不是完美的&#xff0c;从https://bugs.openjdk.java.net/projects/JDK/issues上也可以看到JDK的很多Bug&#xff0c;当然&#xff0c;通过给concurrency-interest发邮件也可以和Doug Lea直接对话。 最重要的是&#xff0c;知道了这些bug的存在…

JUC集合类 ConcurrentLinkedQueue源码解析 JDK8

文章目录前言概述不变式基本不变式headtail初始化队列初始化Node初始化add/offer 入队操作出队操作pollpeekfirstremove 删除操作remove的bugsize 弱一致性的方法addAll迭代器总结前言 ConcurrentLinkedQueue是一种FIFO&#xff08;first-in-first-out 先入先出&#xff09;的…

JUC集合类 ConcurrentLinkedDeque源码解析 JDK8

文章目录前言概述linkFirst 入队pollFirst 获取并出队first()succ()unlink()unlinkFirstskipDeletedPredecessorsupdateHeadupdateTailgc-unlinking松弛阈值unlink的Unlink interior node逻辑peekFirst 仅获取remove 删除操作size迭代器总结前言 ConcurrentLinkedDeque是一个无…