JUC - CountDownLach原理分析
人工智能 2025-10-09 11:54:44
0
CountDownLach闭锁背景
CountDownLatch是原理在Java1.5被引入,跟它一起被引入的分析工具类还有CyclicBarrier、Semaphore、原理ConcurrenthashMap和BlockingQueue。分析 在java.util.cucurrent包下。原理概念
CountDownLatch这个类使一个线程等待其它线程各自执行完毕后再执行。分析 是原理通过一个计数器来实现的,计数器的分析初始值是线程的数量。每当一个线程执行完毕后,原理计数器的站群服务器分析值就-1,当计数器的原理值为0时,表示所有线程都执行完毕,分析然后在闭锁上等待的原理线程就可以恢复工作来。源码
countDownLatch类中只提供了一个构造器 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 类中有三个方法是分析最重要的 // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }//和await()方法类似,原理只不过等待一定的时间后count值还没变为0的化就会继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }//将count值减1 public void countDown() { sync.releaseShared(1); }示例普通示例:
public class CountDownLatchTest { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); System.out.println("主线程开始执行…… ……"); //第一个子线程执行 ExecutorService es1 = Executors.newSingleThreadExecutor(); es1.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); System.out.println("子线程:"+Thread.currentThread().getName()+"执行"); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); } }); es1.shutdown(); //第二个子线程执行 ExecutorService es2 = Executors.newSingleThreadExecutor(); es2.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程:"+Thread.currentThread().getName()+"执行"); latch.countDown(); } }); es2.shutdown(); System.out.println("等待两个线程执行完毕…… ……"); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("两个子线程都执行完毕,继续执行主线程"); } }结果集:
主线程开始执行…… …… 等待两个线程执行完毕…… ……子线程:pool-1-thread-1执行子线程:pool-2-thread-1执行两个子线程都执行完毕,继续执行主线程模拟并发示例:
public class Parallellimit { public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); CountDownLatch cdl = new CountDownLatch(100); for (int i = 0; i < 100; i++) { CountRunnable runnable = new CountRunnable(cdl); pool.execute(runnable); } }} class CountRunnable implements Runnable { private CountDownLatch countDownLatch; public CountRunnable(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { synchronized (countDownLatch) { /*** 每次减少一个容量*/ countDownLatch.countDown(); System.out.println("thread counts = " + (countDownLatch.getCount())); } countDownLatch.await(); System.out.println("concurrency counts = " + (100 - countDownLatch.getCount())); } catch (InterruptedException e) { e.printStackTrace(); } }}源码分析
public class CountDownLatch { //继承AQS来实现他的高防服务器模板方法(tryAcquireShared,tryReleaseShared) private static final class Sync extends AbstractQueuedSynchronizer { //计数个数Count Sync(int count) { setState(count); } int getCount() { return getState(); } //AQS方法getState(),返回同步状态,这里指计数器值 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //循环+cas重试 直到计数器为0 跳出,则release(实现aqs共享模式释放方法) protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; //实例化 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //带有一个超时时间的awit public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); }}总结CountDownLatch 和 Semaphore 一样都是共享模式下资源问题,这些源码实现AQS的模版方法,然后使用CAS+循环重试实现自己的功能。在RT多个资源调用,或者执行某种操作依赖其他操作完成下可以发挥这个计数器的作用。