Java中的 CyclicBarrier详解

目录

  • CyclicBarrier简介
  • CyclicBarrier源码分析
    • 类的继承关系
    • ​类的属性
    • 类的构造函数
  • 核心函数 - dowait函数
    • 核心函数 - nextGeneration函数

      CyclicBarrier简介

      对于CountDownLatch,其他线程为trip = lock.newCondition(); /** The number of parties */ // 参与的线程数量 private final int parties; /* The command to run when tripped */ // 由最后一个进入 barrier 的线程执行的操作 private final Runnable barrierCommand; /** The current generation */ // 当前代 private Generation generation = new Generation(); // 正在等待进入屏障的线程数量 private int count; }

      该属性有一个为ReentrantLock对象,有一个为Condition对象,而Condition对象又是基于AQS的,所以,归根到底,底层还是由AQS提供支持。

      Java中的 CyclicBarrier详解

      类的构造函数

      public CyclicBarrier(int parties, Runnable barrierAction) { // 参与的线程数量小于等于0,抛出异常 if (parties <= 0) throw new IllegalArgumentException(); // 设置parties this.parties = parties; // 设置count this.count = parties; // 设置barrierCommand this.barrierCommand = barrierAction; }

      该构造函数可以指定关联该CyclicBarrier的线程数量,并且可以指定在所有线程都进入屏障后的执行动作,该执行动作由最后一个进行屏障的线程执行。

      public CyclicBarrier(int parties) { // 调用含有两个参数的构造函数 this(parties, null); }

      该构造函数仅仅执行了关联该CyclicBarrier的线程数量,没有设置执行动作。

      核心函数 - dowait函数

      此函数为CyclicBarrier类的核心函数,CyclicBarrier类对外提供的await函数在底层都是调用该了doawait函数,

      private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 保存当前锁 final ReentrantLock lock = this.lock; // 锁定 lock.lock(); try { // 保存当前代 final Generation g = generation; if (g.broken) // 屏障被破坏,抛出异常 throw new BrokenBarrierException(); if (Thread.interrupted()) { // 线程被中断 // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用 breakBarrier(); // 抛出异常 throw new InterruptedException(); } // 减少正在等待进入屏障的线程数量 int index = --count; if (index == 0) { // 正在等待进入屏障的线程数量为0,所有线程都已经进入 // 运行的动作标识 boolean ranAction = false; try { // 保存运行动作 final Runnable command = barrierCommand; if (command != null) // 动作不为空 // 运行 command.run(); // 设置ranAction状态 ranAction = true; // 进入下一代 nextGeneration(); return 0; } finally { if (!ranAction) // 没有运行的动作 // 损坏当前屏障 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 无限循环 for (;;) { try { if (!timed) // 没有设置等待时间 // 等待 trip.await(); else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0 // 等待指定时长 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏 // 损坏当前屏障 breakBarrier(); // 抛出异常 throw ie; } else { // 不等于当前带后者是屏障被损坏 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 中断当前线程 Thread.currentThread().interrupt(); } } if (g.broken) // 屏障被损坏,抛出异常 throw new BrokenBarrierException(); if (g != generation) // 不等于当前代 // 返回索引 return index; if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0 // 损坏屏障 breakBarrier(); // 抛出异常 throw new TimeoutException(); } } } finally { // 释放锁 lock.unlock(); } }

      核心函数 - nextGeneration函数

      此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中,

      private void nextGeneration() { // signal completion of last generation // 唤醒所有线程 trip.signalAll(); // set up next generation // 恢复正在等待进入屏障的线程数量 count = parties; // 新生一代 generation = new Generation(); }

      在此函数中会调用AQS的signalAll方法,即唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。

      public final void signalAll() { if (!isHeldExclusively()) // 不被当前线程独占,抛出异常 throw new IllegalMonitorStateException(); // 保存condition队列头节点 Node first = firstWaiter; if (first != null) // 头节点不为空 // 唤醒所有等待线程 doSignalAll(first); }

      到此这篇关于Java中的CyclicBarrier详解的文章就介绍到这了,更多相关Java中的CyclicBarrier内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

      发布于 2022-04-22 18:01:04
      收藏
      分享
      海报
      0 条评论
      24
      上一篇:ElasticSearch如何设置某个字段不分词浅析 下一篇:Java8 Stream 流常用方法合集
      目录

        0 条评论

        本站已关闭游客评论,请登录或者注册后再评论吧~

        忘记密码?

        图形验证码