JUC框架 CyclicBarrier源码解析 JDK8

news/2024/5/20 7:19:03 标签: java, CyclicBarrier, JUC, 多线程, 并发

文章目录

  • 前言
  • 与CountDownLatch的区别
  • 重要成员
  • 构造器
  • 辅助方法
    • nextGeneration
    • breakBarrier
    • reset
  • await
  • 从问题分析深入理解CyclicBarrier
    • 有几种线程在执行?
    • 一代线程们通过barrier的完整流程是什么?
    • 前n-1个线程发生中断或超时的流程是什么?
    • 第n个线程在执行nextGeneration()之前,前面的线程中断或超时会怎么样?
    • 第n个线程在执行nextGeneration()后,前面的线程中断或超时会怎么样?
    • g == generation成立,意味着什么?
    • CyclicBarrier使用的非公平锁有什么影响?
    • 第n+1个线程获得到的Generation局部变量,会不会是第一代的?
    • 条件队列上的node的线程,肯定是同一“代”的吗?
    • 同步队列上的node的线程,肯定是同一“代”的吗?
    • 如果generation成员已经broken,该怎么办?
    • 调用await(0, TimeUnit.SECONDS)可能导致TimeoutException?
  • 总结

前言

上一篇文章CountDownLatch源码解析我们学习了CountDownLatch,接下来我们来分析一下和它用法很类似的CyclicBarrier

JUC框架 系列文章目录

与CountDownLatch的区别

在CountDownLatch中,有两种毫无关系的线程:一是执行countDown的线程,用来减小计数器count,肯定不会阻塞;二是执行await的线程,用来阻塞等待直到count为0。

但在CyclicBarrier中,每个执行await的线程都将充当上面两种角色,即先令count减一再阻塞等待。

我们举个栗子,比如山地景区里的缆车,每个缆车可以装4个人,为了不浪费位置,前3个人都需要等待第4个人的到来(线程先阻塞),只有每次都要等到齐了4个人后,才让他们一起上缆车(唤醒前面阻塞的线程),当然在上缆车之前第4个人要负责给大家拴好安全带(执行构造器传入的barrierAction参数)后才让大家上缆车。

重要成员

java">    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

我们将成员分成三组:

1.计数器相关:

java">private final int parties;
private int count;
  • parties成员。代表每次通过barrier一共需要的线程数量,它是final的,被构造器所赋值被不再改变,因为每次通过barrier后需要靠它重新复位计数器。
  • count成员。Number of parties still waiting,当前还需要多少个线程到达barrier,它的初始值是parties,通过barrier时它肯定变成了0。
  • 简单的说,到达barrier是指,CyclicBarrier的使用者因调用await而阻塞;通过barrier是指,CyclicBarrier的使用者从await处返回。

2.同步组件相关:

java">private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

CyclicBarrier的线程同步是依赖于 ReentrantLock 和 Condition 的。到达barrier的线程开始都会进入condition queue后阻塞,等待signal的到来;这些线程被唤醒后,将进入sync queue中排队抢锁,抢到锁的线程才能退出CyclicBarrier#await

所以看完本文也就相当于对Condition的实现进行了一遍复习,阅读本文也需要读者比较了解Condition的实现。

3.CyclicBarrier自定义需要:

java">private Generation generation = new Generation();
private final Runnable barrierCommand;

CyclicBarrier是可以重复使用的,每一组一起通过barrier的线程我们称它们为一代Generation,这就像是一种版本标记。在AtomicStampedReference源码里面,我们通过一个int值来代表当前的版本,而Generation却不需要有一个int值成员来作为版本信息,因为比较的时候,我们都是直接通过g1 == g2比较两个Generation对象的地址值来判断版本是否为同一个版本,也就是说,每new一次Generation就代表是新的一代。

barrierCommand是每一代线程们通过barrier之前要做的事情,它虽然是一个Runnable对象,但不要误会,我们执行它的run方法时根本不会新起一个线程来执行,这里你甚至可以将这句的Runnable随便变成一个具有run方法的类都可以。当然,barrierCommand可以为null,这说明通过barrier不需要做什么事情。

构造器

java">public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

它有两个构造器,第一个版本也是调用第二个,但没有传入那个Runnable对象而已。

可见CyclicBarrier的计数器必须至少为1。如果可以为0,也没有意义。

辅助方法

在讲解await()之前,先讲解一下这几个辅助方法,顺便加深理解CyclicBarrier的自定义成员。

nextGeneration

前面提到,每一组一起通过barrier的线程我们称它们为一代Generation,每new一次Generation对象就代表是新的一代。在CyclicBarrier中,我们使用nextGeneration来开启新的一代,自然它里面也会去newGeneration对象。

java">private void nextGeneration() {
    // 唤醒条件队列上的这一代线程
    trip.signalAll();
    // 恢复count值,因为需要重新计数
    count = parties;
    generation = new Generation();
}

nextGeneration用来开启新的一代,它除了new了Generation,还需要唤醒条件队列上的这一代线程,恢复count值以重新计数。正常流程下,一代中的最后一个调用CyclicBarrier#await的线程将执行此方法。

既然执行了Condition#signalAll方法,说明执行nextGeneration之前就已经获得了lock锁,而且nextGeneration的末尾也没有执行ReentrantLock#unlock方法,这说明,nextGeneration的整个过程都是持有锁的,不用担心多线程访问的问题了。(看来不用担心多线程访问带给我们的心智负担了~)

breakBarrier

breakBarrier将打破计数器的限制,不管当前到达barrier的线程够不够,直接让 当前已到达barrier 的线程们通过barrier。

java">private void breakBarrier() {
    // 标记这一代是broken的
    generation.broken = true;
    // 恢复count值
    count = parties;
    // 唤醒条件队列上的这一代线程(数量可能不够parties个)
    trip.signalAll();
}

这和nextGeneration干的事情几乎一样,除了它没有去newGeneration对象,只是标记当前Generation对象是broken的。重点在于,它不会开启新的一代。

同理,breakBarrier的全程都是持有锁的。

reset

reset会将CyclicBarrier恢复成初始状态,里面直接调用了上面的两个方法。从下面可见,nextGenerationbreakBarrier全称都是持有锁的,不过这二者干的事情有点重复,不过这样重复干的事情是不要紧的。

java">public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

如果有线程在await()的过程中发现自己这一代是broken的,则会抛出BrokenBarrierException异常。

await

现在来看CyclicBarrier最重要的部分await,它集齐了两个功能:

  1. 计数器减一
  2. 阻塞等待,直到线程到齐(tripped)、BrokenBarrier(broken)、中断(interrupted)、超时(timed out)。

await方法有两个重载版本,普通版本和超时版本,它们本质都调用同一个函数dowait

java">public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // 从dowait(false, 0L)的逻辑来说是不可能抛出TimeoutException的,因为参数为false
    }
}

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

从两个函数抛出的异常就可以知道,这二者退出await的形式可能是下面这四种:

正常退出await
(即有parties个线程到达了barrier)
因中断而退出
(抛出InterruptedException)
因BrokenBarrier而退出
(抛出BrokenBarrierException)
因超时而退出
(抛出TimeoutException)
await()-
await(long timeout, TimeUnit unit)

注意还有返回值,代表还需要多少个线程到达barrier,比如第一个线程调用await后,返回的是parties-1;最后一次线程返回0。

java">    private int dowait(boolean timed, long nanos)//timed参数为true,才可能抛出TimeoutException
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//整个执行过程必须先获得锁
        try {
            // 这个Generation局部变量保存起来很重要,因为CyclicBarrier的generation成员在通过barrier后会替换为新对象
            // 获得的Generation对象肯定是当前线程应该在的“代”
            final Generation g = generation;

            if (g.broken)  //如果当前线程在调用dowait前,当代generation就是broken的
            	//之所以我敢肯定说是调用dowait前,是因为所有操作的前提都是要先获得锁
                throw new BrokenBarrierException();

            //如果当前线程在调用dowait前,被中断了
            //被中断说明当前线程取消掉了,而CyclicBarrier本着“一个都不少”的原则,就不会让同一代的其他线程苟活下去
            //所以就要先breakBarrier,再抛出InterruptedException异常。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

			//到达barrier时,将count减一的值,这个值作为正常返回时的返回值
            int index = --count;
            //如果当前线程是通过barrier所需要的最后一个线程,即之前已经到达了parties-1个线程
            if (index == 0) {  // tripped
                boolean ranAction = false;//用来标记command.run()执行过程中是否出错
                try {
                    final Runnable command = barrierCommand;
                    //当代中最后一个线程,负责执行command.run()
                    if (command != null)
                    	//说明通过barrier前,有事情需要做
                        command.run();
                    ranAction = true;//标记command.run()执行过程中没有出错
                    nextGeneration();//唤醒条件队列的所有线程,开启下一代
                    return 0;
                } finally {
                    if (!ranAction)//command.run()执行过程中出错了,需要执行breakBarrier
                        breakBarrier();
                }
            }
			//执行到这里,说明当前线程肯定不是当代中的最后一个线程(即index不为0)
			//如果最后一个线程执行command.run()没出错,将return 0返回
			//如果最后一个线程执行command.run()出错了,执行finally块后,将返回上层函数并抛出异常
			
            // 循环不会结束,直到tripped, broken, interrupted, or timed out
            for (;;) {
                try {
					// 两种Condition的await都会阻塞,

                    // 如果是普通版本调用的dowait,那么调用Condition的await
                    if (!timed)
                        trip.await();
                    // 如果是超时版本调用的dowait,那么调用Condition的awaitNanos
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//返回值是deadline减去当前时间的差值
                } catch (InterruptedException ie) {
					//执行到这里,说明上面两种Condition的await,被中断而唤醒

					//如果局部变量和成员变量是同一个对象,且当前不是broken的,则先breakBarrier
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                    	//执行到这里有两种情况:
                    	//1. g != generation,说明成员变量已经更新了,此时再去breakBarrier
                    	//   只能影响到新一代的broken成员,所以不能执行breakBarrier
                    	//2. 虽然g == generation,但已经是broken的了。既然barrier已经被打破
                    	//   那就不用再执行breakBarrier了
                    	
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
				//只要自己这一代是broken的,就抛出异常。只有breakBarrier函数会改变broken,这里给出执行breakBarrier的场景:
				//1. 当代的其他线程调用CyclicBarrier#await之前就已经被中断了(强调“其他”是因为,如果是这个原因,不会执行到这里)
				//2. 当代的其他线程调用Condition#await阻塞后被中断(上面的catch块)
				//3. 最后到达barrier的线程执行barrierCommand时出错了
				//4. reset方法被执行,这可以是任意线程
                if (g.broken)
                    throw new BrokenBarrierException();

                // 执行到这里,说明当代g不是broken的,且最后一个线程已经到达barrier
                //(最后一个到达了,会执行nextGeneration更新generation成员的)
                if (g != generation)
                	//此时才可以正常返回
                    return index;

				//执行到这里,说明当代g不是broken的,CyclicBarrier的代也没有更新呢
				//说明最后一个线程还没来,自己就超时了
				//如果发现已经超时,就打破barrier,抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//就算抛出异常,抛出之前也会释放锁的
        }
    }

主要细节我已经在注释中说明了。

简单总结下:

  • 一般情况下,每个线程开始都令count成员减一,最终返回也是减一后的值。
  • 在每一代中,前parties-1个线程到达barrier时,会执行Condition#await(指两个版本)而阻塞。
  • 在每一代中,第parties个线程到达barrier时,它负责执行barrierCommand,不管是否出错,都将返回上层代码(可能以抛出异常的形式)。
  • 保存的局部变量g一定是当前线程所属的“代”,而CyclicBarrier.generation成员变量则保存当前CyclicBarrier进行到第几“代”了。这两个变量很重要。

CyclicBarrier_273">从问题分析深入理解CyclicBarrier

就算理解了dowait的每行代码的逻辑,可能也不了解整个CyclicBarrier的协作过程,所以我们通过以下几个问题的解答来深入理解CyclicBarrier

有几种线程在执行?

从宏观的角度来看,只有两种:

  • 执行dowait的线程。这些线程想要正常通过barrier。
  • 执行reset的线程。让当代的线程全部作废(抛出BrokenBarrierException),开启下一代。

从Generation的状态来看:

  • 执行breakBarrier的线程。它只会让当代Generation变成broken,但不会开启下一代。注意,一旦broken了,就没法再变回去了,只有靠nextGeneration生成新的Generation了。
  • 执行nextGeneration的线程。生成新的Generation,新生成的Generation状态就不是broken的了。

从线程的阻塞唤醒状态来看:

  • 执行Condition#await的线程。这类线程不是每一代中到达barrier的最后一个线程,所以都会调用Condition#await而阻塞。
  • 执行Condition#signalAll的线程。可以是任意线程。
    • 如果是每一代中到达barrier的最后一个线程,不管执行barrierCommand是否出错,最终都会执行到Condition#signalAll
    • 如果不是每一代中到达barrier的最后一个线程,也可能因为中断或超时,而去执行breakBarrier,进而执行到Condition#signalAll

一代线程们通过barrier的完整流程是什么?

假设每次通过barrier需要n个线程。假设执行barrierCommand不出错

  • 前n-1个线程,都会将count减1,然后调用Condition#await(这里指两个版本)阻塞。前n-1个线程的node此时都处于condition queue上。
  • 第n个线程,将count减到0,假设执行barrierCommand不出错,将调用nextGeneration里的Condition#signalAll,将前n-1个线程的node转移到sync queue上。注意nextGeneration会更新generation成员,也就是说前n-1个线程保存的局部变量g已经和成员变量不相等了。
  • 第n个线程return 0之前,将执行finally块里的lock.unlock(),然后唤醒第1个线程。
  • 第1个线程获得锁后,从Condition#await中退出后,因为if (g != generation) return index;而退出,退出前它将执行finally块里的lock.unlock(),然后唤醒第2个线程。
  • 以此类推,最终前n-1个线程都将从CyclicBarrier#await中正常退出。

以上过程,从使用者的角度来看,就是第n个线程到达barrier后,所有线程都开始重新工作了。

假设执行barrierCommand出错

  • 前n-1个线程,都会将count减1,然后调用Condition#await(这里指两个版本)阻塞。前n-1个线程的node此时都处于condition queue上。
  • 第n个线程,将count减到0,如果执行barrierCommand出错,将调用breakBarrier里的Condition#signalAll,将前n-1个线程的node转移到sync queue上。注意,breakBarrier不会更新generation成员,但会把generation成员变成broken的。
  • 第n个线程在抛出“执行barrierCommand出错”的异常前,将执行finally块里的lock.unlock(),然后唤醒第1个线程。
  • 第1个线程获得锁后,从Condition#await中退出后,因为if (g.broken) throw new BrokenBarrierException();而抛异常,抛异常前它将执行finally块里的lock.unlock(),然后唤醒第2个线程。
  • 以此类推,最终前n-1个线程都将从CyclicBarrier#await中收获一个BrokenBarrierException

以上过程,从使用者的角度来看,似乎到达了所有线程要么都做(每个线程都从CyclicBarrier#await正常返回),要么都不做(每个线程都从CyclicBarrier#await抛出了异常)的目的。

前n-1个线程发生中断或超时的流程是什么?

假设第n个线程还没有到来,前n-1个线程中的任意线程发生中断或超时。

先分析中断:

  • 如果某个线程发生了中断,它从Condition#await处会抛出中断异常。
    • 此时,当前线程的node已经转移到了sync queue(因为Condition#await里的checkInterruptWhileWaiting),然后执行Condition#awaitacquireQueued进行阻塞式的抢锁,退出Condition#await之前根据interruptMode而抛出了异常。具体请看Condition接口的实现。
  • 抛出中断异常被catch住,进入if (g == generation && ! g.broken)分支,执行breakBarrier里的Condition#signalAll,唤醒在条件队列上的所有线程。
  • 其他线程被唤醒后,因为if (g.broken) throw new BrokenBarrierException();抛出BrokenBarrier异常。

再分析超时:

  • 如果某个线程发生了超时,它从Condition#awaitNanos处正常返回但返回值是一个<=的数。
  • 接下来会执行到if (timed && nanos <= 0L)分支,因为前面的分支都没有进入,执行breakBarrier里的Condition#signalAll,唤醒在条件队列上的所有线程。
  • 其他线程被唤醒后,因为if (g.broken) throw new BrokenBarrierException();抛出BrokenBarrier异常。

从上述过程可见,在当代还没有被breakBarrier时,第一个出问题的线程会报告问题原因(中断异常或超时异常),而当代的其他线程则只会报告BrokenBarrierException的原因了。

还有一种情况,当代的其他线程会带有更多信息。考虑上面两种过程,第一个出问题的线程还没有执行到Condition#signalAll,当代其他某一个线程被中断了(因为第一个出问题线程还没释放锁,所以不会从Condition#await处退出,但Condition#await会记录下来这个比signal流程来得更早的中断),等到这某一个线程从Condition#await处抢到锁返回时,肯定会抛出中断异常。然后中断异常被catch,执行if (g == generation && ! g.broken)的else分支,再自我中断一下,最后才抛出BrokenBarrierException。此时,线程抛出BrokenBarrierException的同时还带有中断状态。

第n个线程在执行nextGeneration()之前,前面的线程中断或超时会怎么样?

简单的说,不会怎么样。因为第n个线程在执行nextGeneration()之前,肯定还没有释放锁。而前面的线程因为获得不到锁,还是会在Condition#await里的acquireQueued继续阻塞。

java">            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

如上代码,假设第n个线程执行barrierCommand不出错,在执行nextGeneration()之前还需要一段时间。

既然Condition#await里的acquireQueued只有在第n个线程执行nextGeneration()然后释放锁后才能抢到锁,所以“前面的线程中断或超时”的具体结果,就交给下一个问题分析。

第n个线程在执行nextGeneration()后,前面的线程中断或超时会怎么样?

第n个线程都执行完了nextGeneration(),说明执行barrierCommand没有出错,此时generation成员已经被更新,也就是说,前面的线程在判断g == generation肯定不成立。

更重要的是,一旦第n个线程都执行完了nextGeneration(),旧generationbroken属性就不可能再被改变了,因为breakBarrier只能broken当前的generation成员

而既然第n个线程能执行nextGeneration(),那说明没有进入之前的if (g.broken) throw new BrokenBarrierException();分支,第n个线程执行时,那个旧generationbroken属性肯定为false

假如前面的线程被中断:

  • 当前线程从Condition#await处唤醒,catch处中断异常,由于g != generation而进入if (g == generation && ! g.broken)的else分支,即自我中断一下。
  • 局部变量g保存着当前线程的代,根据上面分析它肯定不是broken的。所以不会if (g.broken) throw new BrokenBarrierException();
  • 因为if (g != generation)成立,而正常返回。

前面的线程超时的流程同理,只不过第一步不会进入catch处中,然后直接第二步。

java">				//执行到这里,说明g不是broken的

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }

这也解释了为什么dowait的最后两个判断是这个顺序。只有超时事件在第n个线程到来之前,即当前线程所属的代和CyclicBarriergeneration成员是同一个对象时,才需要去响应超时,抛出超时异常。

现在回答这个问题:对于每一代来说,第n个线程在执行nextGeneration()后,前面的线程中断或超时不会响应而抛出异常,因为第n个线程已经顺利通过了barrier,前n-1个线程迟早也会顺利通过的。

g == generation成立,意味着什么?

意味着当前这一代,第n个线程还没有到达barrier,如果它到达了,就会执行nextGeneration()的,这个条件也就不会成立。

CyclicBarrier_382">CyclicBarrier使用的非公平锁有什么影响?

CyclicBarrier的构造器中可见,使用的是ReentrantLock的非公平锁。

考虑这种场景:

  • 第n个线程执行完nextGeneration()后,执行lock.unlock()释放锁并唤醒第1个线程。
  • 第1个线程从Condition#await里的LockSupport.park(this)处被唤醒,然后通过acquireQueued(node, savedState)开始抢锁。
  • 第n+1个线程(这属于第二代)执行CyclicBarrier#await里的lock.lock(),也开始抢锁。
  • 本来第1个线程的node处于sync queue中的第一个位置(head后继),但由于是非公平抢锁,也有可能第n+1个线程抢到锁。

虽然第n+1个线程抢到锁后,也是调用Condition#await后马上释放锁,但如果这种非公平方式多次抢占成功,可能会造成第一代的线程们从CyclicBarrier#await处返回得比较慢。

第n+1个线程获得到的Generation局部变量,会不会是第一代的?

如果第n个线程顺利通过了barrier(执行barrierCommand没出错),那么不会,肯定是第二代Generation。
如果第n个线程执行barrierCommand出错,那么第n+1个线程获得的是第一代Generation,但第n+1个线程马上会抛出BrokenBarrierException,所以不用考虑。

条件队列上的node的线程,肯定是同一“代”的吗?

是的。不管是第n个线程执行了nextGeneration,还是reset执行的nextGeneration,都会先将用signalAll把条件队列清空,再更新generation成员。此后,第n+1个线程调用CyclicBarrier#await获得的局部变量肯定是另一个Generation成员了。

同步队列上的node的线程,肯定是同一“代”的吗?

不是。比如第2n+1个线程调用CyclicBarrier#await获得的局部变量已经是第三代Generation了,但可能sync queue的head后继还是第一代的线程node。

如果generation成员已经broken,该怎么办?

因为没有方法能让generation成员的broken属性从true变回false,所以只有靠reset方法来更新generation成员,不然这个CyclicBarrier就一直都不可用了(线程一调用await()就马上抛出BrokenBarrierException)。

所以我们在发现有BrokenBarrierException抛出后,应该去调用reset方法来重置CyclicBarrier,反正这一代的线程们迟早也会抛出BrokenBarrierException

调用await(0, TimeUnit.SECONDS)可能导致TimeoutException?

分两种情况:

如果是前n-1个线程的任意线程通过调用await(0, TimeUnit.SECONDS)到达的barrier,流程如下:

  • 执行到for循环,由于timed为true,且nanos为0,所以根本不会阻塞。
  • 假设前面两个分支都不会进入,所以最终进入if (timed && nanos <= 0L)分支,而抛出TimeoutException
  • 这还导致它之前的其他线程被唤醒后,会抛出BrokenBarrierException

这从语义上讲得通,前n-1个线程要求等待0单位的时间,时间到了(因为是0,所以实际上根本没等),发现第n个线程还没有到达barrier(等价于g == generation,因为如果到达了会nextGeneration从而更新generation成员的),所以就抛出TimeoutException

如果是第n个线程通过调用await(0, TimeUnit.SECONDS)到达的barrier,那么不会发生上面这种异常情况,因为直接进入了if (index == 0)分支。

总结

  • CyclicBarrier类似于CountDownLatch但又不同,在CyclicBarrier里,每个线程既要CountDown减小计数器,也要阻塞等待直到count为0(即等待线程到齐)。
  • CyclicBarrier基于独占锁、同步队列、条件队列而实现。
  • CyclicBarrier可以重复使用,每有parties个线程到达barrier,开启新的一代。然后旧的一代线程就会相继从CyclicBarrier#await退出。
  • CyclicBarrier实现了all-or-none breakage model的原则,同一代线程要么都正常返回CyclicBarrier#await,要么都从CyclicBarrier#await抛出异常。如果一个线程因为中断、超时或执行barrierCommand而出错,将抛出用于解释原因的异常,那么就broken掉CyclicBarrier,同一代的其他线程之后会抛出BrokenBarrierException

http://www.niftyadmin.cn/n/768194.html

相关文章

JUC框架 Semaphore源码解析 JDK8

文章目录前言重要成员构造器获取信号量&#xff08;减小state&#xff09;非公平实现的tryAcquireShared公平实现的tryAcquireShared释放信号量&#xff08;增加state&#xff09;工具方法tryAcquirereducePermitsdrainPermits总结前言 Semaphore在多线程协作中常常用来控制公…

ReentrantReadWriteLock的readerShouldBlock与apparentlyFirstQueuedIsExclusive 深入理解读锁的非公平实现

文章目录前言writerShouldBlock的非公平实现readerShouldBlock的非公平实现写锁无限等待 indefinite writer starvationheuristic启发式地防止new reader总结前言 在ReentrantReadWriteLock的读锁或写锁的获取过程中&#xff0c;在CAS修改同步器状态之前&#xff0c;会使用rea…

JUC框架 ReentrantReadWriteLock源码解析 JDK8

文章目录前言重要成员内部类关系构造器Sync的成员同步器状态的划分读锁计数部分写锁的获取和释放写锁的获取写锁的释放读锁的获取和释放读锁的获取读锁的释放锁降级总结前言 ReentrantReadWriteLock是我阅读了AQS源码以来最感兴趣的类&#xff0c;因为它不像别的JUC构件只使用…

JUC集合类 CopyOnWriteArrayList源码解析 JDK8

文章目录前言核心成员常用方法getsetaddremoveremove(int index)remove(Object o)index > lenindex < lenfindIndex代码块之后cleartoArray迭代器总结前言 CopyOnWriteArrayList 是一种写时复制的ArrayList&#xff0c;它将读操作和写操作的情形区分开来&#xff0c;并在…

JUC集合类 CopyOnWriteArraySet源码解析 JDK8

文章目录前言与CopyOnWriteArrayList不同之处addIfAbsentaddAllAbsent总结前言 类似于上一篇讲的CopyOnWriteArrayList&#xff0c;CopyOnWriteArraySet可以认为是一个写时复制的HashSet。 但CopyOnWriteArraySet的底层实现完全依赖了CopyOnWriteArrayList&#xff0c;它持有…

JUC集合类 ConcurrentSkipListMap源码解析 JDK8

文章目录前言源码注释术语节点定义构造器get 查找操作返回情况findPredecessorput 插入操作假设新建层数没有超过最大层数假设新建层数超过了最大层数返回情况remove 删除操作findNodetryReduceLevel返回情况marker存在的必要性如果marker不存在marker存在时总结前言 Concurre…

ThreadLocalRandom#getProbe #advanceProbe浅析

前言 Briefly, a thread’s “probe” value is a non-zero hash code that (probably) does not collide with other existing threads with respect to any power of two collision space. When it does collide, it is pseudo-randomly adjusted (using a Marsaglia XorShif…

JUC集合类 ConcurrentHashMap源码解析 JDK8

文章目录前言常量成员节点类构造器put 插入操作加锁情况红黑树的binCount固定为2返回情况spreadinitTablehelpTransferresizeStampsizeCtl的低16bit退出循环的条件treeifyBintryPresizeaddCount计数部分计数部分结束时扩容部分CAS失败影响扩容fullAddCountwasUncontended的作用…