1. CountDownLatch2 为什么已经有了CountDownLatch
,还要自定义一个CountDownLatch2
呢?
下面先介绍下CountDownLatch
定义 :CountDownLatch是Java并发编程中的同步工具类,主要用于协调多个线程的执行顺序,允许一个或多个线程等待其他线程完成操作后再继续执行 。其核心机制:通过计数器实现阻塞与唤醒,计数器初始值为任务线程数,每当一个线程完成任务时计数器减1,当计数器归零时等待线程被唤醒。
核心机制与原理 :
计数器初始化 :创建时设定初始计数值(如等待的线程数),该值只能被减少,不可重置。
await()方法 :调用此方法的线程将被阻塞,直到计数器变为0。
countDown()方法 :每调用一次计数器减1,当减至0时唤醒所有等待线程。
典型应用场景 :
主线程等待子线程完成 :例如服务启动时等待所有组件加载完毕。
多线程并发测试 :通过统一触发多个线程模拟高并发场景。
任务分阶段执行 :如“赛跑”场景中,等待所有选手准备就绪后同时开始
代码示例 :
1 2 3 4 5 6 7 8 9 10 CountDownLatch latch = new CountDownLatch (3 );for (int i = 0 ; i < 3 ; i++) { new Thread (() -> { latch.countDown(); }).start(); } latch.await(); System.out.println("所有子线程完成" );
但是,CountDownLatch的计数器在初始化后不能被重置,一旦计数器的值达到0,它就不能再次使用 。所以我们定义了CountDownLatch2
,能够重复使用的同步器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class CountDownLatch2 { private final Sync sync; public CountDownLatch2 (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public String toString () { return super .toString() + "[Count = " + sync.getCount() + "]" ; } private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; private final int startCount; Sync(int count) { this .startCount = count; setState(count); } int getCount () { return getState(); } @Override protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int releases) { for (; ; ) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } protected void reset () { setState(startCount); } } }
CountDownLatch2
是如何实现的重复使用呢?相比CountDownLatch
额外提供了reset()
方法重置初始计数器。原理里很简单,Sync
同步类通过setState(startCount)
实现修改private volatile int state
同步状态值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; private final int startCount; Sync(int count) { this .startCount = count; setState(count); } int getCount () { return getState(); } @Override protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int releases) { for (; ; ) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } protected void reset () { setState(startCount); } }
2. ServiceThread 如下介绍的是RocketMq使用自定义的CountDownLatch
实现的单线程永动机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 public abstract class ServiceThread implements Runnable { protected static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final long JOIN_TIME = 90 * 1000 ; protected Thread thread; protected final CountDownLatch2 waitPoint = new CountDownLatch2 (1 ); protected volatile AtomicBoolean hasNotified = new AtomicBoolean (false ); protected volatile boolean stopped = false ; protected boolean isDaemon = false ; private final AtomicBoolean started = new AtomicBoolean (false ); public ServiceThread () { } public abstract String getServiceName () ; public void start () { log.info("Try to start service thread:{} started:{} lastThread:{}" , getServiceName(), started.get(), thread); if (!started.compareAndSet(false , true )) { return ; } stopped = false ; this .thread = new Thread (this , getServiceName()); this .thread.setDaemon(isDaemon); this .thread.start(); log.info("Start service thread:{} started:{} lastThread:{}" , getServiceName(), started.get(), thread); } public void shutdown () { this .shutdown(false ); } public void shutdown (final boolean interrupt) { log.info("Try to shutdown service thread:{} started:{} lastThread:{}" , getServiceName(), started.get(), thread); if (!started.compareAndSet(true , false )) { return ; } this .stopped = true ; log.info("shutdown thread[{}] interrupt={} " , getServiceName(), interrupt); wakeup(); try { if (interrupt) { this .thread.interrupt(); } long beginTime = System.currentTimeMillis(); if (!this .thread.isDaemon()) { this .thread.join(this .getJoinTime()); } long elapsedTime = System.currentTimeMillis() - beginTime; log.info("join thread[{}], elapsed time: {}ms, join time:{}ms" , getServiceName(), elapsedTime, this .getJoinTime()); } catch (InterruptedException e) { log.error("Interrupted" , e); } } public long getJoinTime () { return JOIN_TIME; } public void makeStop () { if (!started.get()) { return ; } this .stopped = true ; log.info("makestop thread[{}] " , this .getServiceName()); } public void wakeup () { if (hasNotified.compareAndSet(false , true )) { waitPoint.countDown(); } } protected void waitForRunning (long interval) { if (hasNotified.compareAndSet(true , false )) { this .onWaitEnd(); return ; } waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted" , e); } finally { hasNotified.set(false ); this .onWaitEnd(); } } protected void onWaitEnd () { } public boolean isStopped () { return stopped; } public boolean isDaemon () { return isDaemon; } public void setDaemon (boolean daemon) { isDaemon = daemon; } }
如何使用呢?
仅需要继承抽象类ServiceThread
,并实现 getServiceName 和 run 方法即可。启动的时候,调用 start 方法 , 关闭的时候调用 shutdown 方法。
在 run 方法内部,使用抽象类的 waitForRunning 方法实现等待的效果,底层是通过 CountDownLatch 的 wait 方法 , 而不是 Thread.sleep 方式。
同时 ,在某些场景下,单线程可能需要与其他线程交互,ServiceThread 提供了类似于 wakeUp 唤醒方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class MyThreadService extends ServiceThread { @Override public String getServiceName () { return "my thread service" ; } @Override public void run () { while (!this .isStopped()) { try { waitForRunning(10 ); } catch (Exception e) { } } } }
Ps: CountDownLatch 的await
方法和Object.wait
方法在多线程编程中都有用于线程同步的功能 ,那它们之间有什么区别呢?在使用方式、同步机制和适用场景上加以说明。
使用方式:
CountDownLatch的await方法:这是CountDownLatch类中的一个方法,用于等待直到计数值变为0。当调用await方法时,当前线程会被挂起,直到CountDownLatch的计数器达到0,其他线程通过调用countDown()方法来减少计数器的值。CountDownLatch的await方法不需要传入参数,它直接等待计数器变为0。
Object.wait方法:这是Object类中的一个方法,用于在同步块中等待。当调用wait方法时,当前线程会释放锁并进入等待状态,直到其他线程调用该对象的notify或notifyAll方法来唤醒它。Object.wait方法需要传入一个参数,表示等待的时间长度(可选),如果不传参数则表示无限等待。
同步机制: CountDownLatch:它是一种单向同步机制,主要用于一个或多个线程等待其他线程完成操作。CountDownLatch的计数器从初始值递减到0,一旦计数器变为0,所有等待的线程会被唤醒继续执行。它适用于一个或多个线程等待其他线程完成的情况
Object.wait:基于共享对象的同步机制,允许线程之间的相互等待和唤醒。当一个线程调用wait方法时,它会释放锁并进入等待状态,直到其他线程通过notify或notifyAll方法唤醒它。适用于多个线程之间的相互协作和通信
适用场景:
CountDownLatch:适用于一个或多个线程等待其他线程完成操作的场景,例如在初始化过程中等待多个组件加载完成。 Object.wait/notify():适用于多个线程之间的相互协作和通信,例如生产者-消费者问题中的生产者和消费者之间的同步。