这是rocketmq中封装的线程类org.apache.rocketmq.common.ServiceThread,rocketmq通过继承该线程类实现两个方法getServiceName()run(),来实现对线程功能的增强。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public abstract class ServiceThread implements Runnable {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private static final long JOIN_TIME = 90 * 1000;

protected Thread thread;
// 使用countdown实现线程的等待与唤醒
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
// 和started作用类似
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
// 线程停止标识,使用volatile修饰保证可见性
protected volatile boolean stopped = false;
protected boolean isDaemon = false;

//Make it able to restart the thread,使得可以反复调用strat和shutdown操作线程而不会产生影响。
private final AtomicBoolean started = new AtomicBoolean(false);

public ServiceThread() {

}

// 0. 设置线程名---重要,方便排查问题
public abstract String getServiceName();

// 1. 线程任务启动
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
// 如果线程已经启动,started为true,这里就直接返回
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
}

public void shutdown() {
this.shutdown(false);
}

public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread[{}] interrupt={} ", getServiceName(), interrupt);

//if thead is waiting, wakeup it;如果线程是waiting状态,立即唤醒它
wakeup();

try {
if (interrupt) {
this.thread.interrupt();
}

long beginTime = System.currentTimeMillis();
if (!this.thread.isDaemon()) {
this.thread.join(this.getJoinTime());
}
long elapsedTime = System.currentTimeMillis() - beginTime;
log.info("join thread[{}], elapsed time: {}ms, join time:{}ms", getServiceName(), elapsedTime, this.getJoinTime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}

public long getJoinTime() {
return JOIN_TIME;
}

public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread[{}] ", this.getServiceName());
}

public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}

//entry to wait
waitPoint.reset();

try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}

protected void onWaitEnd() {
}

public boolean isStopped() {
return stopped;
}

public boolean isDaemon() {
return isDaemon;
}

public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}

1. BatchUnregistrationService

继承ServiceThread类,实现两个方法,线程启动后不断从阻塞队列获取任务去执行。那如何停止呢?volatile boolean stopped = false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class BatchUnregistrationService extends ServiceThread {
private final RouteInfoManager routeInfoManager;
private BlockingQueue<UnRegisterBrokerRequestHeader> unregistrationQueue;
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

public BatchUnregistrationService(RouteInfoManager routeInfoManager, NamesrvConfig namesrvConfig) {
this.routeInfoManager = routeInfoManager;
this.unregistrationQueue = new LinkedBlockingQueue<>(namesrvConfig.getUnRegisterBrokerQueueCapacity());
}

/**
* Submits an unregister request to this queue.
*
* @param unRegisterRequest the request to submit
* @return {@code true} if the request was added to this queue, else {@code false}
*/
public boolean submit(UnRegisterBrokerRequestHeader unRegisterRequest) {
return unregistrationQueue.offer(unRegisterRequest);
}

@Override
public String getServiceName() {
return BatchUnregistrationService.class.getName();
}

@Override
public void run() {
while (!this.isStopped()) {
try {
final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
unregistrationQueue.drainTo(unregistrationRequests);

// Add polled request
unregistrationRequests.add(request);

this.routeInfoManager.unRegisterBroker(unregistrationRequests);
} catch (Throwable e) {
log.error("Handle unregister broker request failed", e);
}
}
}

// For test only
int queueLength() {
return this.unregistrationQueue.size();
}
}

2. PopLongPollingService

大名鼎鼎长连接拉取消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
public class PopLongPollingService extends ServiceThread {

private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
private long lastCleanTime = 0;

private final AtomicLong totalPollingNum = new AtomicLong(0);
private final boolean notifyLast;

public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor, boolean notifyLast) {
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build();
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.notifyLast = notifyLast;
}

@Override
public String getServiceName() {
if (brokerController.getBrokerConfig().isInBrokerContainer()) {
return brokerController.getBrokerIdentity().getIdentifier() + PopLongPollingService.class.getSimpleName();
}
return PopLongPollingService.class.getSimpleName();
}

@Override
public void run() {
int i = 0;
while (!this.stopped) {
try {
this.waitForRunning(20);
i++;
if (pollingMap.isEmpty()) {
continue;
}
long tmpTotalPollingNum = 0;
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
String key = entry.getKey();
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
if (popQ == null) {
continue;
}
PopRequest first;
do {
first = popQ.pollFirst();
if (first == null) {
break;
}
if (!first.isTimeout()) {
if (popQ.add(first)) {
break;
} else {
POP_LOGGER.info("polling, add fail again: {}", first);
}
}
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("timeout , wakeUp polling : {}", first);
}
totalPollingNum.decrementAndGet();
wakeUp(first);
}
while (true);
if (i >= 100) {
long tmpPollingNum = popQ.size();
tmpTotalPollingNum = tmpTotalPollingNum + tmpPollingNum;
if (tmpPollingNum > 100) {
POP_LOGGER.info("polling queue {} , size={} ", key, tmpPollingNum);
}
}
}

if (i >= 100) {
POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}",
pollingMap.size(), tmpTotalPollingNum, totalPollingNum.get(),
Math.abs(totalPollingNum.get() - tmpTotalPollingNum));
totalPollingNum.set(tmpTotalPollingNum);
i = 0;
}

// clean unused
if (lastCleanTime == 0 || System.currentTimeMillis() - lastCleanTime > 5 * 60 * 1000) {
cleanUnusedResource();
}
} catch (Throwable e) {
POP_LOGGER.error("checkPolling error", e);
}
}
// clean all;
try {
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
PopRequest first;
while ((first = popQ.pollFirst()) != null) {
wakeUp(first);
}
}
} catch (Throwable e) {
}
}

public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) {
this.notifyMessageArrivingWithRetryTopic(topic, queueId, -1L, null, 0L, null, null);
}

public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String notifyTopic;
if (KeyBuilder.isPopRetryTopicV2(topic)) {
notifyTopic = KeyBuilder.parseNormalTopic(topic);
} else {
notifyTopic = topic;
}
notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}

public void notifyMessageArriving(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
if (cids == null) {
return;
}
long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval();
boolean force = interval > 0L && offset % interval == 0L;
for (Map.Entry<String, Byte> cid : cids.entrySet()) {
if (queueId >= 0) {
notifyMessageArriving(topic, -1, cid.getKey(), force, tagsCode, msgStoreTime, filterBitMap, properties);
}
notifyMessageArriving(topic, queueId, cid.getKey(), force, tagsCode, msgStoreTime, filterBitMap, properties);
}
}

public boolean notifyMessageArriving(final String topic, final int queueId, final String cid,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
return notifyMessageArriving(topic, queueId, cid, false, tagsCode, msgStoreTime, filterBitMap, properties, null);
}

public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
return notifyMessageArriving(topic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties, null);
}

public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties, CommandCallback callback) {
ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}

PopRequest popRequest = pollRemotingCommands(remotingCommands);
if (popRequest == null) {
return false;
}

if (!force && popRequest.getMessageFilter() != null && popRequest.getSubscriptionData() != null) {
boolean match = popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
if (match && properties != null) {
match = popRequest.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (!match) {
remotingCommands.add(popRequest);
totalPollingNum.incrementAndGet();
return false;
}
}

if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", popRequest);
}

return wakeUp(popRequest, callback);
}

public boolean wakeUp(final PopRequest request) {
return wakeUp(request, null);
}

public boolean wakeUp(final PopRequest request, CommandCallback callback) {
if (request == null || !request.complete()) {
return false;
}

if (callback != null && request.getRemotingCommand() != null) {
if (request.getRemotingCommand().getCallbackList() == null) {
request.getRemotingCommand().setCallbackList(new ArrayList<>());
}
request.getRemotingCommand().getCallbackList().add(callback);
}

if (!request.getCtx().channel().isActive()) {
return false;
}

Runnable run = () -> {
try {
final RemotingCommand response = processor.processRequest(request.getCtx(), request.getRemotingCommand());
if (response != null) {
response.setOpaque(request.getRemotingCommand().getOpaque());
response.markResponseType();
NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
if (!future.isSuccess()) {
POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
POP_LOGGER.error(request.toString());
POP_LOGGER.error(response.toString());
}
});
}
} catch (Exception e1) {
POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
}
};

this.brokerController.getPullMessageExecutor().submit(
new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
return true;
}

/**
* @param ctx
* @param remotingCommand
* @param requestHeader
* @return
*/
public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand remotingCommand,
final PollingHeader requestHeader) {
return this.polling(ctx, remotingCommand, requestHeader, null, null);
}

public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand remotingCommand,
final PollingHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) {
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
return NOT_POLLING;
}
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
if (cids == null) {
cids = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
if (old != null) {
cids = old;
}
}
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
long expired = requestHeader.getBornTime() + requestHeader.getPollTime();
final PopRequest request = new PopRequest(remotingCommand, ctx, expired, subscriptionData, messageFilter);
boolean isFull = totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize();
if (isFull) {
POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", remotingCommand, totalPollingNum.get());
return POLLING_FULL;
}
boolean isTimeout = request.isTimeout();
if (isTimeout) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("polling {}, result POLLING_TIMEOUT", remotingCommand);
}
return POLLING_TIMEOUT;
}
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
requestHeader.getQueueId());
ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key);
if (queue == null) {
queue = new ConcurrentSkipListSet<>(PopRequest.COMPARATOR);
ConcurrentSkipListSet<PopRequest> old = pollingMap.putIfAbsent(key, queue);
if (old != null) {
queue = old;
}
} else {
// check size
int size = queue.size();
if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size);
return POLLING_FULL;
}
}
if (queue.add(request)) {
remotingCommand.setSuspended(true);
totalPollingNum.incrementAndGet();
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("polling {}, result POLLING_SUC", remotingCommand);
}
return POLLING_SUC;
} else {
POP_LOGGER.info("polling {}, result POLLING_FULL, add fail, {}", request, queue);
return POLLING_FULL;
}
}

public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
return pollingMap;
}

private void cleanUnusedResource() {
try {
{
Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> topicCidMapIter = topicCidMap.entrySet().iterator();
while (topicCidMapIter.hasNext()) {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
String topic = entry.getKey();
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic);
topicCidMapIter.remove();
continue;
}
Iterator<Map.Entry<String, Byte>> cidMapIter = entry.getValue().entrySet().iterator();
while (cidMapIter.hasNext()) {
Map.Entry<String, Byte> cidEntry = cidMapIter.next();
String cid = cidEntry.getKey();
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic);
cidMapIter.remove();
}
}
}
}

{
Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> pollingMapIter = pollingMap.entrySet().iterator();
while (pollingMapIter.hasNext()) {
Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry = pollingMapIter.next();
if (entry.getKey() == null) {
continue;
}
String[] keyArray = entry.getKey().split(PopAckConstants.SPLIT);
if (keyArray.length != 3) {
continue;
}
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic);
pollingMapIter.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic);
pollingMapIter.remove();
}
}
}
} catch (Throwable e) {
POP_LOGGER.error("cleanUnusedResource", e);
}

lastCleanTime = System.currentTimeMillis();
}

private PopRequest pollRemotingCommands(ConcurrentSkipListSet<PopRequest> remotingCommands) {
if (remotingCommands == null || remotingCommands.isEmpty()) {
return null;
}

PopRequest popRequest;
do {
if (notifyLast) {
popRequest = remotingCommands.pollLast();
} else {
popRequest = remotingCommands.pollFirst();
}
totalPollingNum.decrementAndGet();
} while (popRequest != null && !popRequest.getChannel().isActive());

return popRequest;
}
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.