JUC - CountDownLach原理分析

 CountDownLach闭锁背景

CountDownLatch是原理在Java1.5被引入,跟它一起被引入的分析工具类还有CyclicBarrier、Semaphore、原理ConcurrenthashMap和BlockingQueue。分析 在java.util.cucurrent包下。原理

概念

CountDownLatch这个类使一个线程等待其它线程各自执行完毕后再执行。分析 是原理通过一个计数器来实现的,计数器的分析初始值是线程的数量。每当一个线程执行完毕后,原理计数器的站群服务器分析值就-1,当计数器的原理值为0时,表示所有线程都执行完毕,分析然后在闭锁上等待的原理线程就可以恢复工作来。

源码

countDownLatch类中只提供了一个构造器 public CountDownLatch(int count) {    if (count < 0) throw new IllegalArgumentException("count < 0");      this.sync = new Sync(count); }  类中有三个方法是分析最重要的 // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public void await() throws InterruptedException {          sync.acquireSharedInterruptibly(1);     }//和await()方法类似,原理只不过等待一定的时间后count值还没变为0的化就会继续执行 public boolean await(long timeout, TimeUnit unit)         throws InterruptedException {         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));     }//将count值减1 public void countDown() {         sync.releaseShared(1);     } 

示例普通示例:

public class CountDownLatchTest {      public static void main(String[] args) {          final CountDownLatch latch = new CountDownLatch(2);         System.out.println("主线程开始执行…… ……");         //第一个子线程执行         ExecutorService es1 = Executors.newSingleThreadExecutor();         es1.execute(new Runnable() {              @Override             public void run() {                  try {                      Thread.sleep(3000);                     System.out.println("子线程:"+Thread.currentThread().getName()+"执行");                 } catch (InterruptedException e) {                      e.printStackTrace();                 }                 latch.countDown();             }         });         es1.shutdown();         //第二个子线程执行         ExecutorService es2 = Executors.newSingleThreadExecutor();         es2.execute(new Runnable() {              @Override             public void run() {                  try {                      Thread.sleep(3000);                 } catch (InterruptedException e) {                      e.printStackTrace();                 }                 System.out.println("子线程:"+Thread.currentThread().getName()+"执行");                 latch.countDown();             }         });         es2.shutdown();         System.out.println("等待两个线程执行完毕…… ……");         try {              latch.await();         } catch (InterruptedException e) {              e.printStackTrace();         }         System.out.println("两个子线程都执行完毕,继续执行主线程");     } } 

结果集:

主线程开始执行…… …… 等待两个线程执行完毕…… ……子线程:pool-1-thread-1执行子线程:pool-2-thread-1执行两个子线程都执行完毕,继续执行主线程 

模拟并发示例:

public class Parallellimit {      public static void main(String[] args) {          ExecutorService pool = Executors.newCachedThreadPool();        CountDownLatch cdl = new CountDownLatch(100);         for (int i = 0; i < 100; i++) {              CountRunnable runnable = new CountRunnable(cdl);             pool.execute(runnable);        }    }} class CountRunnable implements Runnable {      private CountDownLatch countDownLatch;     public CountRunnable(CountDownLatch countDownLatch) {          this.countDownLatch = countDownLatch;     }    @Override     public void run() {          try {              synchronized (countDownLatch) {                 /*** 每次减少一个容量*/                 countDownLatch.countDown();                System.out.println("thread counts = " + (countDownLatch.getCount()));             }            countDownLatch.await();             System.out.println("concurrency counts = " + (100 - countDownLatch.getCount()));         } catch (InterruptedException e) {              e.printStackTrace();        }    }} 

源码分析

public class CountDownLatch {      //继承AQS来实现他的高防服务器模板方法(tryAcquireShared,tryReleaseShared)     private static final class Sync extends AbstractQueuedSynchronizer {        //计数个数Count         Sync(int count) {              setState(count);        }        int getCount() {              return getState();         }      //AQS方法getState(),返回同步状态,这里指计数器值        protected int tryAcquireShared(int acquires) {              return (getState() == 0) ? 1 : -1;         }       //循环+cas重试 直到计数器为0 跳出,则release(实现aqs共享模式释放方法)         protected boolean tryReleaseShared(int releases) {              // Decrement count; signal when transition to zero             for (;;) {                  int c = getState();                 if (c == 0)                     return false;                 int nextc = c-1;                 if (compareAndSetState(c, nextc))                     return nextc == 0;             }        }    }    private final Sync sync;   //实例化     public CountDownLatch(int count) {          if (count < 0) throw new IllegalArgumentException("count < 0");         this.sync = new Sync(count);    }    public void await() throws InterruptedException {         sync.acquireSharedInterruptibly(1);     }  //带有一个超时时间的awit    public boolean await(long timeout, TimeUnit unit)        throws InterruptedException {         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));     }      public void countDown() {         sync.releaseShared(1);     }    public long getCount() {         return sync.getCount();     }} 

总结CountDownLatch 和 Semaphore 一样都是共享模式下资源问题,这些源码实现AQS的模版方法,然后使用CAS+循环重试实现自己的功能。在RT多个资源调用,或者执行某种操作依赖其他操作完成下可以发挥这个计数器的作用。

滇ICP备2023000592号-31