JUC集合类 DelayQueue源码解析 JDK8

news/2024/5/20 7:19:03 标签: java, DelayQueue, JUC

前言

DelayQueue是一个无界阻塞队列,它和PriorityBlockingQueue一样是一个优先队列,但区别在于队列元素只能放置Delayed对象,而且只有元素到期后才能将其出队。

内部是一个最小堆,堆顶永远是最先“到期”的那个元素。如果堆顶元素没有到期,即使线程发现队列中有元素,也不能将其出队。

DelayQueue需要依赖于元素对Delayed接口正确实现,即保证到期时间短的Delayed元素.compareTo(到期时间长的Delayed元素) < 0,这样可以让到期时间短的Delayed元素排在队列前面。

JUC框架 系列文章目录

成员

java">//非公平的锁
private final transient ReentrantLock lock = new ReentrantLock();

//使用PriorityQueue存储元素,是个最小堆
private final PriorityQueue<E> q = new PriorityQueue<E>();

//Leader-Follower线程模式中的Leader,它总是等待获取队首
private Thread leader = null;

//不管哪种线程都将阻塞在这个条件队列上。但Follower可能是无限的阻塞
private final Condition available = lock.newCondition();

Leader-Follower

首先我们想一个问题,在队列中的处于队首的Delayed元素,由于还没到期,只能暂时等待等到它到期,这种暂时等待必然需要使用到Condition.awaitNanos。虽然第一个来的线程是可以明确知道要等队首元素多久(通过getDelay),但第二个或以后来的线程就不知道该等多久了,明显它们应该去等待排名第二或以后的元素,但奈何优先队列是个最小堆,最小堆只能时刻知道最小元素是谁。

所以,干脆让第二个或以后来的线程无限阻塞(Condition.await),但我们让第一个线程负责唤醒沉睡在条件队列上的线程。因为第一个线程总是使用Condition.awaitNanos,所以不会造成条件队列上的线程睡到天荒地老。第一个线程总是等待获得队堆顶,当它出队成功后,再唤醒后面的线程去获得新堆顶。

上面说的第一个线程其实就是Leader-Follower模式中的Leader了,它总是会以Condition.awaitNanos的方式阻塞,这保证了它不会一直沉睡。而其他线程就是所谓的Follower,当它们检测到Leader的存在时,则可以放心使用Condition.await,就好像调好了闹钟所以可以放心大胆睡觉一样。

入队

java">    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
  • lock.lock()入队不响应中断,也没有必要响应中断。毕竟DelayQueue是无界队列,不可能出现因队列满而阻塞的情况,也就不用响应中断了。
  • if (q.peek() == e)成立,说明新元素入队后成为了堆顶,说明最小元素更新了。这也说明了之前的leader(如果存在的话)调用的awaitNanos的参数偏大了,因为现在有了更小的元素进来。那么干脆清空leader(也有可能leader本来就是null,即使条件队列里有线程),唤醒条件队列第一个线程,让leader以更小的参数调用awaitNanos
  • if (q.peek() == e)不成立,说明之前的leader(如果存在的话)调用的awaitNanos的参数还是正确的,所以也就不需要什么操作。
  • 理论上,该函数不可能失败,只会返回true。

出队

take

take函数完美解释了Leader-Follower模式。

java">    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//抢锁过程响应中断,即可能这里抛出空指针异常
        try {
            for (;;) {
                E first = q.peek();
                //如果队列为空
                if (first == null)
                    available.await();
                //如果队列不为空
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)//队列首元素已经到期,此时可以取出
                        return q.poll();
                    //队列首元素还没到期
                    first = null; //接下来将阻塞,阻塞期间不要持有元素引用,以免内存泄漏
                    if (leader != null) //如果leader有线程占领了,那么直接进入条件队列
                        available.await();
                    else { //如果leader还没有线程占领
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//当前线程占领leader
                        try {
                            available.awaitNanos(delay);//不是无限阻塞,而是有时间的阻塞,阻塞期间一直占领leader
                        } finally {
                            //阻塞结束后,当前线程放弃需暂时放弃leader身份
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
  • if (first == null),如果队列为空,每个线程进来都会无限阻塞。当第一个元素入队时,offer里的if (q.peek() == e)成立(空堆加入元素自然是堆顶啦),然后会唤醒条件队列里的第一个线程。
  • 如果队列不为空,但队首元素又没有到期,那么每个线程接下来都将要阻塞了,但都会尝试成为leader。
    • if (leader != null),如果leader不为null,那就放心去无限阻塞。
    • 如果leader为null,那么当前线程即将成为leader。但leader不能无限阻塞,而且现在知道堆顶元素的到期时间,所以直接available.awaitNanos(delay)
      • 当leader从available.awaitNanos(delay)返回时,将执行finally块,然后清空掉自己的leader身份(如果自己是的话)。从await返回可能是队首元素到期了(接下来将return),也可能发现队列为空(因为remove,然后当前线程也无限阻塞),也可能发现队首元素还是没有到期,然后重新获得leader身份。总之,在此期间是持有锁的,不用担心别的线程来修改leader,大不了在再次阻塞前重新获得leader身份。
      • 一定要清空掉自己的leader身份,因为当前线程已经离开条件队列了。leader之所以为leader就是因为它阻塞于条件队列中,且它负责唤醒条件队列中的其他线程。
      • 注意available.awaitNanos(delay)完全可能因为抛中断异常而返回,但也会执行finally块,然后清空掉自己的leader身份。都抛出异常了,自然也不能继续占着leader身份了。
  • 两处available.await()也会可能会抛出中断异常的。所以本函数退出的原因有4个:1. 两处available.await()抛出异常 2. available.awaitNanos(delay)抛出异常 3. return q.poll()正常return。
    • 本函数退出时,都会执行最后的finally块。这个finally块在leader线程抛出中断异常发挥重要作用,这种情况leader线程先执行第一个finally块保证清空掉自己的leader身份,然后执行第二个finally块里的available.signal()负责唤醒条件队列里的线程。当然,leader线程正常return时(它刚刚清空掉自己的leader身份,但还是这样称呼它比较好理解),也会执行第二个finally块里的available.signal()
    • 最后的finally块很有必要,因为available.signal()负责唤醒条件队列中的线程,从而避免Follower无限阻塞。

简单总结一下:

  1. Leader执行available.awaitNanos(delay),进行限时的阻塞。
  2. Follower执行available.await(),进行无限的阻塞。
  3. Leader线程在退出take函数时会唤醒一个沉睡在条件队列上的Follower,所以Follower实际上不会一直阻塞下去。
  4. 每个线程在阻塞前都会尝试成为Leader,否则成为Follower。同时只有一个Leader。
  5. Leader在阻塞期间一直都是Leader身份(Leader == 当前线程),但唤醒后马上清空掉自己的Leader身份(Leader = null),之后一段时间由于一直持有锁(这里指退出take或再次阻塞之前),所以也不用担心别的线程修改Leader。
    1. 如果 退出take,退出前将唤醒一个沉睡在条件队列上的Follower。
    2. 如果再次阻塞,那么重新获得Leader身份。反正一直持有着锁,当确定了要重新当Leader后再获得Leader身份也不迟。

内存泄漏

take函数中,first = null用来防止内存泄漏。简单的说,每个线程在阻塞期间都不持有堆顶元素的引用。

假设没有这句,看看内存泄漏是怎么发生的:

  1. 线程ABC先后调用take
  2. 线程A是Leader,它唤醒后首先出队 堆顶元素。处理完这个元素后,元素原本应该被GC掉。
  3. 线程BC还持有该元素引用。即使线程B马上被唤醒,线程C也还在阻塞中,必然这个元素不能被GC掉。
  4. 造成了内存泄漏。

超时poll

该函数最大的特点就是,无论哪种情况,阻塞都是使用awaitNanos进行限时的阻塞。

java">    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)//没有剩余等待时间了,只好返回null
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)  //队首元素已经到期,取出
                        return q.poll();
                    //队首元素还没到期
                    if (nanos <= 0)  //但没有剩余等待时间了,只好返回null
                        return null;
                    first = null; 
                    //1. nanos < delay,说明当前线程肯定等不到队首元素了
                    //2. nanos >= delay但leader != null, 前者说明当前线程能等到队首元素,
                    //   但已经有leader了,那就让leader来唤醒自己
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    // nanos >= delay且leader == null, 前者说明当前线程能等到队首元素
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;//右边计算出消耗的时间
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
  • 返回情况多了一种,那就是如果if (nanos <= 0)剩余等待时间小于等于0,那就说明用户已经不想等了,直接返回null。
  • 无论是线程为空时,或是当前线程是一个Follower时,都改用available.awaitNanos(nanos)进行限时的阻塞。
  • 进入if (nanos < delay || leader != null)分支有一种情况是nanos < delay,这说明当前线程肯定等不到队首元素了,但这里还是继续等待awaitNanos(nanos),因为完全有可能在nanos时间内加入一个delay时间更小的元素,小到当前线程又可以等到队首元素。
    • 如果每个线程进入if (nanos < delay || leader != null)分支都是因为nanos < delay,那么将没有线程是Leader。(假设只调用超时poll)
  • nanos >= delay且leader == null时,直接调用awaitNanos(delay),因为阻塞时间取个最小值即可。
  • available.awaitNanos(delay)有可能因signal而提前返回,也可能刚好到时返回,也可能因为迟迟抢不到独占锁(毕竟是非公平的ReentrantLock)而消耗更多的时间。也就是说,随着时间流逝,available.awaitNanos(delay)的返回值范围为 d e l a y ∼ − ∞ delay \sim -\infty delay,现在delay - timeLeft,所以范围变成 ( d e l a y − d e l a y ) ∼ ( d e l a y − ( − ∞ ) ) (delay - delay) \sim (delay-(-\infty)) (delaydelay)(delay()),也就是 0 ∼ + ∞ 0\sim +\infty 0+。也就是说,delay - timeLeftawaitNanos消耗的时间,所以nanos要减去消耗的时间,如果下一次循环还会再次阻塞,那么将以减去的新值来阻塞。

迭代器

和PriorityBlockingQueue一样,迭代器初始化时,传入一个当前DelayQueue队列的数组快照。所以也是弱一致性的。

总结

  • DelayQueue和PriorityBlockingQueue一样是一个优先队列。
  • 队列元素只能放置Delayed对象,而且只有元素到期后才能将其出队。
  • DelayQueue需要依赖于元素对Delayed接口正确实现。
  • Leader-Follower模式减小了无意义的线程唤醒,只在Leader退出出队函数时唤醒Follower,以避免Follower线程一直阻塞在AQS条件队列里。

http://www.niftyadmin.cn/n/768173.html

相关文章

JUC集合类 LinkedTransferQueue源码解析 JDK8

文章目录前言LinkedTransferQueue概述术语解释xfer交易后来的一方交易先来的一方tryAppendtryMatchDataunsplice为什么是普通语义而不是CAS内部删除 remove迭代器总结前言 LinkedTransferQueue是一种特殊的无界阻塞队列&#xff0c;它提供一种Transfer的功能&#xff0c;用以保…

JUC集合类 SynchronousQueue源码解析 JDK8

文章目录前言Transferer抽象类TransferStack节点成员节点类型TransferStack成员transfer方法awaitFulfillcleanTransferQueue节点成员节点类型TransferQueue成员transfer方法awaitFulfillclean无效操作总结前言 SynchronousQueue其实就是LinkedTransferQueue的升级版&#xff…

JUC框架 从Runnable到Callable到FutureTask 使用浅析

前言 本文旨在简单讲解Runnable、Callable、FutureTask这几个线程执行相关的接口和类。为后面FutureTask源码讲解作铺垫。 JUC框架 系列文章目录 Runnable FunctionalInterface public interface Runnable {public abstract void run(); }我们知道创建线程有两种方式&#…

JUC框架 FutureTask源码解析 JDK8

文章目录前言状态消费者链表成员构造器实现Runnable接口实现Future接口普通get、超时getcancel、isCancelledisDone普通写和CAS写混合总结前言 FutureTask的使用方法已经在上一篇进行了讲解&#xff0c;其实它和SynchronousQueue很像&#xff0c;执行task的线程是生产者&#…

JUC框架 CompletableFuture源码解析 JDK8

文章目录前言基础设施创建CompletableFutureCompletableFuture成员Completion内部类AltResult内部类Signaller内部类从supplyAsync thenApply(thenApplyAsync)理解supplyAsyncthenApply(thenApplyAsync)UniApply内部类#tryFireCompletableFuture#uniApply谁执行了当前stage&am…

JUC线程池 ThreadPoolExecutor源码解析 JDK8

文章目录前言成员和构造器线程池状态构造器execute提交方法addWorker尝试新起线程addWorkerFailed回滚WorkertryTerminate 帮助TerminateinterruptIdleWorkersWorker对AQS的实现对Runnable的实现runWorkergetTaskprocessWorkerExit独占锁的作用核心线程预启动关闭线程池awaitTe…

JUC线程池 ScheduledThreadPoolExecutor源码解析 JDK8

文章目录前言成员和构造器成员构造器提交任务ScheduledExecutorService接口——提交延时任务继承部分ScheduledExecutorService接口——提交周期任务delayedExecuteScheduledFutureTask内部类成员和构造器run方法分析周期任务执行过程关闭线程池优先队列总结前言 ThreadPoolEx…

LeetCode-SQL练习题总结(MySQL实现)

文章目录前言176. 第二高的薪水177. 第N高的薪水单次查询子查询自连接自定义变量178. 分数排名180. 连续出现的数字181. 超过经理收入的员工182. 查找重复的电子邮箱183. 从不订购的客户184. 部门工资最高的员工子查询内连接子查询185. 部门工资前三高的所有员工子查询判断工资…