1. Namesrv启动流程

1.1 NamesrvStartup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static void main(String[] args) {
// 1. 启动NamesrvController
main0(args);
// 2. 启动ControllerManager
controllerManagerMain();
}

public static NamesrvController main0(String[] args) {
try {
// a. 解析命令行参数和配置文件
parseCommandlineAndConfigFile(args);
// b. 初始化并启动NamesrvController
NamesrvController controller = createAndStartNamesrvController();
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

// 暂时略过
public static ControllerManager controllerManagerMain() {
try {
if (namesrvConfig.isEnableControllerInNamesrv()) {
// a. 初始化并启动ControllerManager
return createAndStartControllerManager();
}
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}

1.2 NamesrvController

1.2.1 主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 1. 初始化
public boolean initialize() {
// a. 加载配置文件
loadConfig();
// b. 初始化网络组件
initiateNetworkComponents();
// c. 初始化线程池
initiateThreadExecutors();
// d. 注册processor
registerProcessor();
// e. 启动定时服务
startScheduleService();
// f.初始化ssl上下文
initiateSslContext();
// g. 初始化rpc钩子
initiateRpcHooks();
return true;
}

// 2. 启动
public void start() throws Exception {
// a. 启动远程通信server服务
this.remotingServer.start();

if (0 == nettyServerConfig.getListenPort()) {
nettyServerConfig.setListenPort(this.remotingServer.localListenPort());
}

this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
+ ":" + nettyServerConfig.getListenPort()));

// b. 启动远程通信client服务
this.remotingClient.start();

if (this.fileWatchService != null) {
// c. 启动文件监听服务
this.fileWatchService.start();
}
// d. 启动路由信息服务
this.routeInfoManager.start();
}

1.2.2 初始化线程池和启动定时服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 1. 初始化线程池
private void initiateThreadExecutors() {
// a. defaultExecutor初始化
this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
this.defaultExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_"));

// b. clientRequestExecutor初始化
this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
this.clientRequestExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_"));
}

// 2. 启动定时服务
private void startScheduleService() {

// a. 5s执行一次扫描非活跃的broker
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);

// b. 10min执行一次打印kv配置
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
1, 10, TimeUnit.MINUTES);

// c. 1s执行一次打印waterMark
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
NamesrvController.this.printWaterMark();
} catch (Throwable e) {
LOGGER.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
}

1.2.3 启动路由信息服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 1. RouteInfoManager
public void start() {
this.unRegisterService.start();
}

// 2. BatchUnregistrationService,剔除不活跃的broker
@Override
public void run() {
while (!this.isStopped()) {
try {
final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
unregistrationQueue.drainTo(unregistrationRequests);

// Add polled request
unregistrationRequests.add(request);

this.routeInfoManager.unRegisterBroker(unregistrationRequests);
} catch (Throwable e) {
log.error("Handle unregister broker request failed", e);
}
}
}

2. RouteInfoManager核心类详解

2.1 Namesrv保存了哪些信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 1. topic和broker的对应关系
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
// 2. broker和broker详情列表
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
// 3. 集群和broker的关系
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 4. brokerAddrInfo和brokerAddrInfo详情
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

////////////////////////////////////////////////////////////////////////

// 5.
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// 6.
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

topic brokerName brokerAddr

clusterName brokerName

brokerAddrInfo brokerLiveInfo

2.1.1 topicQueueTable

private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"topicQueueTable": {
"topic1": [
{
"brokerName": "broker-a",
"readQueueNums": 4,
"readQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
},
{
"brokerName": "broker-b",
"readQueueNums": 4,
"readQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
}
],
"topic other": []
}
}

2.1.2 brokerAddrTable

private final Map<String/* brokerName */, BrokerData> brokerAddrTable;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"brokerAddrTable": {
"broker-a": {
"cluster": "c1",
"brokerName": "broker-a",
"brokerAddrs": {
"0": "192.168.56.1:10000",
"1": "192.168.56.2:10000"
}
},
"broker-b": {
"cluster": "c1",
"brokerAddrs": {
"0": "192.168.56.3:10000",
"1": "192.168.56.4:10000"
}
}
}
}

2.1.3 clusterAddrTable

private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

1
2
3
4
5
6
7
8
{
"clusterAddrTable": {
"c1": [
{"broker": "broker-a"},
{"broker": "broker-b"}
]
}
}

2.1.4 brokerLiveTable

private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"brokerLiveTable": {
"192.168.56.1:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion": "versionObl",
"channel": "channelObj",
"haServerAddr": "192.168.56.2:10000"
},
"192.168.56.2:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion": "versionObl",
"channel": "channelObj",
"haServerAddr": ""
},
"192.168.56.3:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion": "versionObl",
"channel": "channelObj",
"haServerAddr": "192.168.56.4:10000"
},
"192.168.56.4:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion": "versionObl",
"channel": "channelObj",
"haServerAddr": ""
}
}
}
1
2
3
4
5
6
7
8
9
10
public class QueueData implements Comparable<QueueData> {
// brokerName
private String brokerName;
// 读队列个数
private int readQueueNums;
// 写队列个数
private int writeQueueNums;
private int perm;
private int topicSysFlag;
}
1
2
3
4
5
6
7
8
9
10
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long, String> brokerAddrs;
private String zoneName;
private final Random random = new Random();

private boolean enableActingMaster = false;

}
1
2
3
4
class BrokerAddrInfo {
private String clusterName;
private String brokerAddr;
}
1
2
3
4
5
6
7
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private long heartbeatTimeoutMillis;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
}
1
2
3
4
5
6
7
8
9
10
11
12
public class TopicQueueMappingInfo extends RemotingSerializable {
public static final int LEVEL_0 = 0;

String topic; // redundant field
String scope = MixAll.METADATA_SCOPE_GLOBAL;
int totalQueues;
String bname; //identify the hosted broker name
long epoch; //important to fence the old dirty data
boolean dirty; //indicate if the data is dirty
//register to broker to construct the route
protected ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<>();
}

2.2 路由信息的管理

2.2.1 topic管理

新增、删除topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 1. 
public void registerTopic(final String topic, List<QueueData> queueDatas) {
if (queueDatas == null || queueDatas.isEmpty()) {
return;
}

try {
this.lock.writeLock().lockInterruptibly();
if (this.topicQueueTable.containsKey(topic)) {
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
for (QueueData queueData : queueDatas) {
if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
return;
}
queueDataMap.put(queueData.getBrokerName(), queueData);
}
log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic));
} else {
// check and construct queue data map
Map<String, QueueData> queueDataMap = new HashMap<>();
for (QueueData queueData : queueDatas) {
if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
return;
}
queueDataMap.put(queueData.getBrokerName(), queueData);
}

this.topicQueueTable.put(topic, queueDataMap);
log.info("Register topic route:{}, {}", topic, queueDatas);
}
} catch (Exception e) {
log.error("registerTopic Exception", e);
} finally {
this.lock.writeLock().unlock();
}
}

// 2.
public void deleteTopic(final String topic, final String clusterName) {
try {
this.lock.writeLock().lockInterruptibly();
//get all the brokerNames fot the specified cluster
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (brokerNames == null || brokerNames.isEmpty()) {
return;
}
//get the store information for single topic
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
if (queueDataMap != null) {
for (String brokerName : brokerNames) {
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic, removedQD);
}
}
if (queueDataMap.isEmpty()) {
log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);
this.topicQueueTable.remove(topic);
}
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
} finally {
this.lock.writeLock().unlock();
}
}

2.2.2 broker管理

新增、删除broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// 1. 
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
this.lock.writeLock().lockInterruptibly();

//init or update the cluster info
Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);

boolean registerFirst = false;

BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}

boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);

Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

boolean isMinBrokerIdChanged = false;
long prevMinBrokerId = 0;
if (!brokerAddrsMap.isEmpty()) {
prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
}

if (brokerId < prevMinBrokerId) {
isMinBrokerIdChanged = true;
}

//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());

//If Local brokerId stateVersion bigger than the registering one,
String oldBrokerAddr = brokerAddrsMap.get(brokerId);
if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));

if (null != oldBrokerInfo) {
long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registering Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
return result;
}
}
}

if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}

String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));

boolean isMaster = MixAll.MASTER_ID == brokerId;

boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());

if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {

ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();

if (tcTable != null) {

TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();

// Delete the topics that don't exist in tcTable from the current broker
// Static topic is not supported currently
if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
final Set<String> newTopicSet = tcTable.keySet();
final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
for (final String toDeleteTopic : toDeleteTopics) {
Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
}

if (queueDataMap.isEmpty()) {
log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
topicQueueTable.remove(toDeleteTopic);
}
}
}

for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
// In Slave Acting Master mode, Namesrv will regard the surviving Slave with the smallest brokerId as the "agent" Master, and modify the brokerPermission to read-only.
if (isPrimeSlave && brokerData.isEnableActingMaster()) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}

if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
//the topicQueueMappingInfoMap should never be null, but can be empty
for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
}
//Note asset brokerName equal entry.getValue().getBname()
//here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
}

BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}

if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddrInfo);
} else {
this.filterServerTable.put(brokerAddrInfo, filterServerList);
}
}

if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
if (masterLiveInfo != null) {
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}

if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
} finally {
this.lock.writeLock().unlock();
}

return result;
}

// 2.
public void unRegisterBroker(Set<UnRegisterBrokerRequestHeader> unRegisterRequests) {
try {
Set<String> removedBroker = new HashSet<>();
Set<String> reducedBroker = new HashSet<>();
Map<String, BrokerStatusChangeInfo> needNotifyBrokerMap = new HashMap<>();

this.lock.writeLock().lockInterruptibly();
for (final UnRegisterBrokerRequestHeader unRegisterRequest : unRegisterRequests) {
final String brokerName = unRegisterRequest.getBrokerName();
final String clusterName = unRegisterRequest.getClusterName();
final String brokerAddr = unRegisterRequest.getBrokerAddr();

BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);

BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddrInfo
);

this.filterServerTable.remove(brokerAddrInfo);

boolean removeBrokerName = false;
boolean isMinBrokerIdChanged = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
if (!brokerData.getBrokerAddrs().isEmpty() &&
unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {
isMinBrokerIdChanged = true;
}
boolean removed = brokerData.getBrokerAddrs().entrySet().removeIf(item -> item.getValue().equals(brokerAddr));
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerAddrInfo
);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);

removeBrokerName = true;
} else if (isMinBrokerIdChanged) {
needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(
brokerData.getBrokerAddrs(), brokerAddr, null));
}
}

if (removeBrokerName) {
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);

if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
removedBroker.add(brokerName);
} else {
reducedBroker.add(brokerName);
}
}

cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);

if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(needNotifyBrokerMap);
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
} finally {
this.lock.writeLock().unlock();
}
}

2.2.3 scanNotActiveBroker

每隔5s扫描一次broker列表,删除不活跃的broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void scanNotActiveBroker() {
try {
log.info("start scanNotActiveBroker");
for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
long last = next.getValue().getLastUpdateTimestamp();
long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
if ((last + timeoutMillis) < System.currentTimeMillis()) {
RemotingHelper.closeChannel(next.getValue().getChannel());
log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
this.onChannelDestroy(next.getKey());
}
}
} catch (Exception e) {
log.error("scanNotActiveBroker exception", e);
}
}

public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {
UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();
boolean needUnRegister = false;
if (brokerAddrInfo != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}

if (needUnRegister) {
boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);
log.info("the broker's channel destroyed, submit the unregister request at once, " +
"broker info: {}, submit result: {}", unRegisterRequest, result);
}
}

3. ServiceThread线程类详解

3.1 ServiceThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public abstract class ServiceThread implements Runnable {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private static final long JOIN_TIME = 90 * 1000;

protected Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;

//Make it able to restart the thread
private final AtomicBoolean started = new AtomicBoolean(false);

public ServiceThread() {

}

public abstract String getServiceName();

public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
}

public void shutdown() {
this.shutdown(false);
}

public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread[{}] interrupt={} ", getServiceName(), interrupt);

//if thead is waiting, wakeup it
wakeup();

try {
if (interrupt) {
this.thread.interrupt();
}

long beginTime = System.currentTimeMillis();
if (!this.thread.isDaemon()) {
this.thread.join(this.getJoinTime());
}
long elapsedTime = System.currentTimeMillis() - beginTime;
log.info("join thread[{}], elapsed time: {}ms, join time:{}ms", getServiceName(), elapsedTime, this.getJoinTime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}

public long getJoinTime() {
return JOIN_TIME;
}

public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread[{}] ", this.getServiceName());
}

public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}

//entry to wait
waitPoint.reset();

try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}

protected void onWaitEnd() {
}

public boolean isStopped() {
return stopped;
}

public boolean isDaemon() {
return isDaemon;
}

public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}

3.2 BatchUnregistrationService

单线程永动机,从阻塞队列unregistrationQueue获取元素后,从路由信息中剔除已经失效的broker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class BatchUnregistrationService extends ServiceThread {
private final RouteInfoManager routeInfoManager;
private BlockingQueue<UnRegisterBrokerRequestHeader> unregistrationQueue;
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

public BatchUnregistrationService(RouteInfoManager routeInfoManager, NamesrvConfig namesrvConfig) {
this.routeInfoManager = routeInfoManager;
this.unregistrationQueue = new LinkedBlockingQueue<>(namesrvConfig.getUnRegisterBrokerQueueCapacity());
}

public boolean submit(UnRegisterBrokerRequestHeader unRegisterRequest) {
return unregistrationQueue.offer(unRegisterRequest);
}

@Override
public String getServiceName() {
return BatchUnregistrationService.class.getName();
}

@Override
public void run() {
while (!this.isStopped()) {
try {
final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
unregistrationQueue.drainTo(unregistrationRequests);

// Add polled request
unregistrationRequests.add(request);

this.routeInfoManager.unRegisterBroker(unregistrationRequests);
} catch (Throwable e) {
log.error("Handle unregister broker request failed", e);
}
}
}

// For test only
int queueLength() {
return this.unregistrationQueue.size();
}
}

4. 总结

RPC 服务 + HashMap 存储容器 + 读写锁 + 定时任务

1、NameServer 监听固定的端⼝,提供 RPC 服务;

2、HashMap 作为存储容器;

3、读写锁控制锁的颗粒度;

4、定时任务 ,每个 Broker 每隔 30 秒注册主题的路由信息到所有 NameServer NameServer 定时任务每隔5 秒清除已宕机的 Broker , 判断宕机的标准是:当前时间减去 Broker 最后⼀次⼼跳时间⼤于2分钟;


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.