JUC框架 Semaphore源码解析 JDK8

news/2024/5/20 10:19:55 标签: Java, Semaphore, JUC, AQS, 多线程

文章目录

  • 前言
  • 重要成员
  • 构造器
  • 获取信号量(减小state)
    • 非公平实现的tryAcquireShared
    • 公平实现的tryAcquireShared
  • 释放信号量(增加state)
  • 工具方法
    • tryAcquire
    • reducePermits
    • drainPermits
  • 总结

前言

Semaphore多线程协作中常常用来控制公共资源的访问,限制同时访问数量。

打个比方,Semaphore就像是一个装有令牌(permit)的黑箱子,拿到令牌的人才能去做爱做的事情,谁都可以从里面拿走若干令牌,谁都可以把新的令牌扔到里面去,但Semaphore从来不记载谁拿走的令牌。

重要成员

和CountDownLatch一样,Semaphore依赖的也是AQS的共享锁,核心属性也是AQS的state成员。在Semaphore中,使用内部类继承AQS使用。

构造器

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

默认构造器使用非公平锁,可以通过参数fair来得到公平锁。参数permits最终会赋值给AQS的state成员。

获取信号量(减小state)

Semaphore方法调用的AQS方法是否阻塞是否响应中断是否超时机制返回值及含义
acquire()sync.acquireSharedInterruptibly(1)-void
acquire(int permits)sync.acquireSharedInterruptibly(permits)-void
acquireUninterruptibly()sync.acquireShared(1)--void
acquireUninterruptibly(int permits)sync.acquireShared(permits)--void
tryAcquire(long timeout, TimeUnit unit)sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))boolean
返回时是否获得了锁
tryAcquire(int permits, long timeout, TimeUnit unit)sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout))boolean
返回时是否获得了锁
tryAcquire()sync.nonfairTryAcquireShared(1) >= 0-boolean
返回时是否获得了锁
  • 上面表格中,所有没有参数的Semaphore方法,实质上都是在获得单位1的信号量。
  • 调用的AQS方法,其实只有三种,acquireShared(共享锁的获取与释放中已经进行了讲解),acquireSharedInterruptiblytryAcquireSharedNanos(CountDownLatch源码解析中已经进行了讲解)。这三者的区别已经在表格列出。
    • 需要响应中断,方法声明会抛出中断异常。
    • 有超时机制,就需要用返回值区别 获得锁返回 和 超时都没获得到锁 两种情况。
    • 这三个方法都需要调用到AQS子类实现的tryAcquireShared,该方法用来获取共享锁,子类可以将其实现公平锁或是非公平锁。
  • nonfairTryAcquireShared其实不大应该放在上表里面,因为它根本不是AQS的方法,只是AQS子类的新加方法。因为它根本没有阻塞等待的过程,只是简单的try一次,成功失败听天由命,所以它根本不会阻塞。

表格中,带permits参数的方法可以获得单位不为1的信号量,但是方法中对permits参数的检查要求是>=0,也就是说,允许线程获得0单位的信号量,虽然我感觉这样没有任何意义。

这里我们再复习一下tryAcquireShared返回值的含义:

  • 如果返回值大于0,说明获取共享锁成功,并且后续获取也可能获取成功。
  • 如果返回值等于0,说明获取共享锁成功,但后续获取可能不会成功。
  • 如果返回值小于0,说明获取共享锁失败。

非公平实现的tryAcquireShared

//NonfairSync
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

//Sync
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || 
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  • 方法使用了自旋,这是合理且必要的。共享锁是共享的,自然可能有多个线程正在同时执行上面的代码,即使失败了也不能退出循环,而是应该失败后再得到当前值,然后再次CAS尝试。
  • 如果remaining算出来小于0,说明剩余信号量已经不够拿的了,那就直接返回remaining这个负数(表达获取共享锁失败),不做CAS操作。
  • 如果remaining算出来大于等于0,说明剩余信号量够拿的,紧接着如果CAS设置成功,就返回remaining这个大于等于0的数(表达获取共享锁成功)。
  • 这个方法想要退出,只有当前线程拿到了想要数量的信号量,或剩余信号量已经不够拿。

公平实现的tryAcquireShared

//FairSync
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

区别就是要先判断同步队列中是否已经有节点了(hasQueuedPredecessors),如果有那同步队列中的节点属于是排在当前线程之前的,所以只好直接返回-1。

释放信号量(增加state)

释放信号量就不用区别什么公平不公平了。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
  • 考虑多线程,使用自旋保证releases单位的信号量能够释放到位。
  • 只有CAS设置成功,或溢出int型的范围,才能退出这个循环。

工具方法

tryAcquire

上面表格里,一直没有好好讲tryAcquire,因为它在里面属于一个异类,没有阻塞等待的过程。

    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

nonfairTryAcquireShared的实现也看过了,里面根本没有调用过LockSupport.park,确实没有阻塞。不要被自旋所迷惑,自旋也不是阻塞,而且这个自旋过程一般情形下很快就会结束。

简而言之,Semaphore#tryAcquire的作用就是尝试 一次性的、非公平的 获得锁动作。注意这种一次性动作一定要是非公平实现的,不然大部分情况下(同步队列中只要有一个线程在等待),这种一次性动作肯定不能成功。这也是为什么要把非公平实现放到NonfairSyncFairSync的父类里的一个公共方法里。

注意,返回值进行了处理,如果获得共享锁成功(nonfairTryAcquireShared返回值>=0),返回true;如果获得共享锁失败(nonfairTryAcquireShared返回值<0),返回false。

reducePermits

//Semaphore
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

//Sync
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}
  • 这个方法的作用也是获得信号量,只不过这个函数相比nonfairTryAcquireShared的实现,它允许改变后的信号量是负数。
  • 自旋的过程。想要退出函数,只有CAS操作成功或者向下溢出了。
  • if (next > current)分支进入的原因只能是int型变量向下溢出,因为reductions被保证是一个>=0的数。
  • protected方法,用起来不是那么方便。
//Sync
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || 
            compareAndSetState(available, remaining))
            return remaining;
    }
}

放一下nonfairTryAcquireShared的实现作对比。

drainPermits

//Semaphore
public int drainPermits() {
    return sync.drainPermits();
}

//Sync
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

自旋的过程,直到信号量被清空为0。

总结

  • SemaphoreAQS子类是一个很标准的共享锁实现。
    • 获得信号量 == 减小AQS的state。
    • 释放信号量 == 增加AQS的state。
  • 共享锁的AQS子类实现方法需要自旋,这一点在Semaphore和CountDownLatch都有体现。独占锁的AQS子类实现方法不需要自旋。
  • 获得信号量失败一定是因为信号量已经减到0了,且获得失败就会阻塞。

http://www.niftyadmin.cn/n/768193.html

相关文章

ReentrantReadWriteLock的readerShouldBlock与apparentlyFirstQueuedIsExclusive 深入理解读锁的非公平实现

文章目录前言writerShouldBlock的非公平实现readerShouldBlock的非公平实现写锁无限等待 indefinite writer starvationheuristic启发式地防止new reader总结前言 在ReentrantReadWriteLock的读锁或写锁的获取过程中&#xff0c;在CAS修改同步器状态之前&#xff0c;会使用rea…

JUC框架 ReentrantReadWriteLock源码解析 JDK8

文章目录前言重要成员内部类关系构造器Sync的成员同步器状态的划分读锁计数部分写锁的获取和释放写锁的获取写锁的释放读锁的获取和释放读锁的获取读锁的释放锁降级总结前言 ReentrantReadWriteLock是我阅读了AQS源码以来最感兴趣的类&#xff0c;因为它不像别的JUC构件只使用…

JUC集合类 CopyOnWriteArrayList源码解析 JDK8

文章目录前言核心成员常用方法getsetaddremoveremove(int index)remove(Object o)index > lenindex < lenfindIndex代码块之后cleartoArray迭代器总结前言 CopyOnWriteArrayList 是一种写时复制的ArrayList&#xff0c;它将读操作和写操作的情形区分开来&#xff0c;并在…

JUC集合类 CopyOnWriteArraySet源码解析 JDK8

文章目录前言与CopyOnWriteArrayList不同之处addIfAbsentaddAllAbsent总结前言 类似于上一篇讲的CopyOnWriteArrayList&#xff0c;CopyOnWriteArraySet可以认为是一个写时复制的HashSet。 但CopyOnWriteArraySet的底层实现完全依赖了CopyOnWriteArrayList&#xff0c;它持有…

JUC集合类 ConcurrentSkipListMap源码解析 JDK8

文章目录前言源码注释术语节点定义构造器get 查找操作返回情况findPredecessorput 插入操作假设新建层数没有超过最大层数假设新建层数超过了最大层数返回情况remove 删除操作findNodetryReduceLevel返回情况marker存在的必要性如果marker不存在marker存在时总结前言 Concurre…

ThreadLocalRandom#getProbe #advanceProbe浅析

前言 Briefly, a thread’s “probe” value is a non-zero hash code that (probably) does not collide with other existing threads with respect to any power of two collision space. When it does collide, it is pseudo-randomly adjusted (using a Marsaglia XorShif…

JUC集合类 ConcurrentHashMap源码解析 JDK8

文章目录前言常量成员节点类构造器put 插入操作加锁情况红黑树的binCount固定为2返回情况spreadinitTablehelpTransferresizeStampsizeCtl的低16bit退出循环的条件treeifyBintryPresizeaddCount计数部分计数部分结束时扩容部分CAS失败影响扩容fullAddCountwasUncontended的作用…

JDK8 ConcurrentHashMap的Bug集锦

前言 JDK8的ConcurrentHashMap并不是完美的&#xff0c;从https://bugs.openjdk.java.net/projects/JDK/issues上也可以看到JDK的很多Bug&#xff0c;当然&#xff0c;通过给concurrency-interest发邮件也可以和Doug Lea直接对话。 最重要的是&#xff0c;知道了这些bug的存在…