1. CountDownLatch2

为什么已经有了CountDownLatch,还要自定义一个CountDownLatch2呢?

下面先介绍下CountDownLatch

定义CountDownLatch是Java并发编程中的同步工具类,主要用于协调多个线程的执行顺序,允许一个或多个线程等待其他线程完成操作后再继续执行。其核心机制:通过计数器实现阻塞与唤醒,计数器初始值为任务线程数,每当一个线程完成任务时计数器减1,当计数器归零时等待线程被唤醒。

核心机制与原理

  1. 计数器初始化:创建时设定初始计数值(如等待的线程数),该值只能被减少,不可重置。
  2. await()方法:调用此方法的线程将被阻塞,直到计数器变为0。
  3. countDown()方法:每调用一次计数器减1,当减至0时唤醒所有等待线程。

典型应用场景

  • 主线程等待子线程完成:例如服务启动时等待所有组件加载完毕。
  • 多线程并发测试:通过统一触发多个线程模拟高并发场景。
  • 任务分阶段执行:如“赛跑”场景中,等待所有选手准备就绪后同时开始

代码示例

1
2
3
4
5
6
7
8
9
10
// 主线程等待3个子线程完成任务
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 子线程执行任务
latch.countDown();
}).start();
}
latch.await(); // 主线程阻塞等待
System.out.println("所有子线程完成");

但是,CountDownLatch的计数器在初始化后不能被重置,一旦计数器的值达到0,它就不能再次使用。所以我们定义了CountDownLatch2,能够重复使用的同步器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class CountDownLatch2 {
private final Sync sync;

public CountDownLatch2(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

private final int startCount;

Sync(int count) {
this.startCount = count;
setState(count);
}

int getCount() {
return getState();
}

@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

protected void reset() {
setState(startCount);
}
}
}

CountDownLatch2是如何实现的重复使用呢?相比CountDownLatch额外提供了reset()方法重置初始计数器。原理里很简单,Sync同步类通过setState(startCount)实现修改private volatile int state同步状态值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

private final int startCount;

Sync(int count) {
this.startCount = count;
setState(count);
}

int getCount() {
return getState();
}

@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

// 重要
protected void reset() {
setState(startCount);
}
}

2. ServiceThread

如下介绍的是RocketMq使用自定义的CountDownLatch实现的单线程永动机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public abstract class ServiceThread implements Runnable {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private static final long JOIN_TIME = 90 * 1000;

protected Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;

//Make it able to restart the thread
private final AtomicBoolean started = new AtomicBoolean(false);

public ServiceThread() {

}

public abstract String getServiceName();

public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
}

public void shutdown() {
this.shutdown(false);
}

public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread[{}] interrupt={} ", getServiceName(), interrupt);

//if thead is waiting, wakeup it
wakeup();

try {
if (interrupt) {
this.thread.interrupt();
}

long beginTime = System.currentTimeMillis();
if (!this.thread.isDaemon()) {
this.thread.join(this.getJoinTime());
}
long elapsedTime = System.currentTimeMillis() - beginTime;
log.info("join thread[{}], elapsed time: {}ms, join time:{}ms", getServiceName(), elapsedTime, this.getJoinTime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}

public long getJoinTime() {
return JOIN_TIME;
}

public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread[{}] ", this.getServiceName());
}

public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}

//entry to wait
waitPoint.reset();

try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}

protected void onWaitEnd() {
}

public boolean isStopped() {
return stopped;
}

public boolean isDaemon() {
return isDaemon;
}

public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}

如何使用呢?

仅需要继承抽象类ServiceThread,并实现 getServiceNamerun 方法即可。启动的时候,调用 start 方法 , 关闭的时候调用 shutdown 方法。

在 run 方法内部,使用抽象类的 waitForRunning 方法实现等待的效果,底层是通过 CountDownLatch 的 wait 方法 , 而不是 Thread.sleep 方式。

同时 ,在某些场景下,单线程可能需要与其他线程交互,ServiceThread 提供了类似于 wakeUp 唤醒方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyThreadService extends ServiceThread {
@Override
public String getServiceName() {
return "my thread service";
}

@Override
public void run() {
while (!this.isStopped()) {
try {
// 等待10ms
waitForRunning(10);
// do operate
} catch (Exception e) {

}
}
}
}

Ps: CountDownLatch 的await方法和Object.wait方法在多线程编程中都有用于线程同步的功能,那它们之间有什么区别呢?在使用方式、同步机制和适用场景上加以说明。

  • 使用方式:

    CountDownLatch的await方法‌:这是CountDownLatch类中的一个方法,用于等待直到计数值变为0。当调用await方法时,当前线程会被挂起,直到CountDownLatch的计数器达到0,其他线程通过调用countDown()方法来减少计数器的值。CountDownLatch的await方法不需要传入参数,它直接等待计数器变为0。

    Object.wait方法‌:这是Object类中的一个方法,用于在同步块中等待。当调用wait方法时,当前线程会释放锁并进入等待状态,直到其他线程调用该对象的notify或notifyAll方法来唤醒它。Object.wait方法需要传入一个参数,表示等待的时间长度(可选),如果不传参数则表示无限等待。

  • 同步机制:
    CountDownLatch:它是一种单向同步机制,主要用于一个或多个线程等待其他线程完成操作。CountDownLatch的计数器从初始值递减到0,一旦计数器变为0,所有等待的线程会被唤醒继续执行。它适用于一个或多个线程等待其他线程完成的情况

    Object.wait:基于共享对象的同步机制,允许线程之间的相互等待和唤醒。当一个线程调用wait方法时,它会释放锁并进入等待状态,直到其他线程通过notify或notifyAll方法唤醒它。适用于多个线程之间的相互协作和通信

  • 适用场景:

    CountDownLatch:适用于一个或多个线程等待其他线程完成操作的场景,例如在初始化过程中等待多个组件加载完成。
    Object.wait/notify():适用于多个线程之间的相互协作和通信,例如生产者-消费者问题中的生产者和消费者之间的同步。


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.