1. Broker启动流程

1.1 BrokerStartup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static void main(String[] args) {

start(createBrokerController(args));
}

public static BrokerController createBrokerController(String[] args) {
try {
// 1. 创建brokerController
BrokerController controller = buildBrokerController(args);
// 2. 初始化controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new Thread(buildShutdownHook(controller)));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}

public static BrokerController start(BrokerController controller) {
try {
// 3. 启动controller
controller.start();

String tip = String.format("The broker[%s, %s] boot success. serializeType=%s",
controller.getBrokerConfig().getBrokerName(), controller.getBrokerAddr(),
RemotingCommand.getSerializeTypeConfigInThisServer());

if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}

log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

1.2 BrokerController

1.2.1 主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// 1. 创建 
BrokerController controller = new BrokerController(
brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig, authConfig);

// 2. 初始化
public boolean initialize() throws CloneNotSupportedException {

boolean result = this.initializeMetadata();
if (!result) {
return false;
}

result = this.initializeMessageStore();
if (!result) {
return false;
}

return this.recoverAndInitService();
}

// 3. 启动
public void start() throws Exception {

this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();

if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
isIsolated = true;
}

if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}

startBasicService();

if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}

scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();

scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
}
}
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

if (this.brokerConfig.isEnableControllerMode()) {
scheduleSendHeartbeat();
}

if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) {
LOG.error("ScheduledTask refresh metadata exception", e);
}
}
}, 10, 5, TimeUnit.SECONDS);
}

1.2.2 初始化

初始化元数据、初始化消息存储、恢复和初始化服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// a. 
public boolean initializeMetadata() {
boolean result = true;
if (null != configStorage) {
result = configStorage.start();
}
result = result && this.topicConfigManager.load();
result = result && this.topicQueueMappingManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
result = result && this.consumerOrderInfoManager.load();
return result;
}

// b.
public boolean initializeMessageStore() {
boolean result = true;
try {
DefaultMessageStore defaultMessageStore;
if (this.messageStoreConfig.isEnableRocksDBStore()) {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
defaultMessageStore.enableRocksdbCQWrite();
}
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler =
new DLedgerRoleChangeHandler(this, defaultMessageStore);
((DLedgerCommitLog) defaultMessageStore.getCommitLog())
.getdLedgerServer().getDLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

this.brokerStats = new BrokerStats(defaultMessageStore);

// Load store plugin
MessageStorePluginContext context = new MessageStorePluginContext(
messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (Exception e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
}
return result;
}

// c.
public boolean recoverAndInitService() throws CloneNotSupportedException {

boolean result = true;

if (this.brokerConfig.isEnableControllerMode()) {
this.replicasManager = new ReplicasManager(this);
this.replicasManager.setFenced(true);
}

if (messageStore != null) {
registerMessageStoreHook();
result = this.messageStore.load();
}

if (messageStoreConfig.isTimerWheelEnable()) {
result = result && this.timerMessageStore.load();
}

//scheduleMessageService load after messageStore load success
result = result && this.scheduleMessageService.load();

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
result = result && brokerAttachedPlugin.load();
}
}

this.brokerMetricsManager = new BrokerMetricsManager(this);

if (result) {

initializeRemotingServer();

initializeResources();

registerProcessor();

initializeScheduledTasks();

initialTransaction();

initialAcl();

initialRpcHooks();

initialRequestPipeline();

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;

@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
LOG.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
LOG.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}

private void reloadServerSslContext() {
for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) {
RemotingServer remotingServer = entry.getValue();
if (remotingServer instanceof NettyRemotingServer) {
((NettyRemotingServer) remotingServer).loadSslContext();
}
}
}
});
} catch (Exception e) {
result = false;
LOG.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
}

return result;
}

1.2.3 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public void start() throws Exception {

this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();

if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
isIsolated = true;
}
// 1.
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// 2.
startBasicService();

// 3.
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}

// 4.
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

// 5.
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();

scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
}
}
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

// 6.
if (this.brokerConfig.isEnableControllerMode()) {
scheduleSendHeartbeat();
}

// 7.
if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}

// 8.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) {
LOG.error("ScheduledTask refresh metadata exception", e);
}
}
}, 10, 5, TimeUnit.SECONDS);
}
1.2.3.1 brokerOuterAPI.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactoryImpl("NettyClientWorkerThread_"));
}
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
LOGGER.info("Prepend SSL handler");
} else {
LOGGER.warn("Connections are insecure as SSLContext is null!");
}
}
ch.pipeline().addLast(
nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
}
if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
}
if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
}
if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) {
handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

nettyEventExecutor.start();

TimerTask timerTaskScanResponseTable = new TimerTask() {
@Override
public void run(Timeout timeout) {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
LOGGER.error("scanResponseTable exception", e);
} finally {
timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
}
}
};
this.timer.newTimeout(timerTaskScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);

if (nettyClientConfig.isScanAvailableNameSrv()) {
int connectTimeoutMillis = this.nettyClientConfig.getConnectTimeoutMillis();
TimerTask timerTaskScanAvailableNameSrv = new TimerTask() {
@Override
public void run(Timeout timeout) {
try {
NettyRemotingClient.this.scanAvailableNameSrv();
} catch (Exception e) {
LOGGER.error("scanAvailableNameSrv exception", e);
} finally {
timer.newTimeout(this, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
};
this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0, TimeUnit.MILLISECONDS);
}


}
1.2.3.2 startBasicService()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
protected void startBasicService() throws Exception {

if (this.messageStore != null) {
this.messageStore.start();
}

if (this.timerMessageStore != null) {
this.timerMessageStore.start();
}

if (this.replicasManager != null) {
this.replicasManager.start();
}

if (remotingServerStartLatch != null) {
remotingServerStartLatch.await();
}

for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) {
RemotingServer remotingServer = entry.getValue();
if (remotingServer != null) {
remotingServer.start();

if (TCP_REMOTING_SERVER.equals(entry.getKey())) {
// In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
if (null != nettyServerConfig && 0 == nettyServerConfig.getListenPort()) {
nettyServerConfig.setListenPort(remotingServer.localListenPort());
}
}
}
}

this.storeHost = new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort());

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
brokerAttachedPlugin.start();
}
}

if (this.popMessageProcessor != null) {
this.popMessageProcessor.getPopLongPollingService().start();
if (brokerConfig.isPopConsumerFSServiceInit()) {
this.popMessageProcessor.getPopBufferMergeService().start();
}
this.popMessageProcessor.getQueueLockManager().start();
}

if (this.ackMessageProcessor != null) {
if (brokerConfig.isPopConsumerFSServiceInit()) {
this.ackMessageProcessor.startPopReviveService();
}
}

if (this.notificationProcessor != null) {
this.notificationProcessor.getPopLongPollingService().start();
}

if (this.popConsumerService != null) {
this.popConsumerService.start();
}

if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.start();
}

if (this.fileWatchService != null) {
this.fileWatchService.start();
}

if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}

if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}

if (this.broadcastOffsetManager != null) {
this.broadcastOffsetManager.start();
}

if (this.escapeBridge != null) {
this.escapeBridge.start();
}

if (this.topicRouteInfoManager != null) {
this.topicRouteInfoManager.start();
}

if (this.brokerPreOnlineService != null) {
this.brokerPreOnlineService.start();
}

if (this.coldDataPullRequestHoldService != null) {
this.coldDataPullRequestHoldService.start();
}

if (this.coldDataCgCtrService != null) {
this.coldDataCgCtrService.start();
}
}
1.2.3.3 registerBrokerAll(true, false, true)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// 1. 
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();

for (TopicConfig topicConfig : topicConfigMap.values()) {
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
topicConfigTable.put(topicConfig.getTopicName(),
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
} else {
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

if (this.brokerConfig.isEnableSplitRegistration()
&& topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
topicConfigTable.clear();
}
}

Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
.map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isInBrokerContainer())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}

// 2.
protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {

if (shutdown) {
BrokerController.LOG.info("BrokerController#doRegisterBrokerAll: broker has shutdown, no need to register any more.");
return;
}
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
Lists.newArrayList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
this.getBrokerIdentity());

handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
}

// 3.
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean enableActingMaster,
final boolean compressed,
final Long heartbeatTimeoutMillis,
final BrokerIdentity brokerIdentity) {

final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setEnableActingMaster(enableActingMaster);
requestHeader.setCompressed(false);
if (heartbeatTimeoutMillis != null) {
requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
}

RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
@Override
public void run0() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}

LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
} catch (Exception e) {
LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}

try {
if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
}
} catch (InterruptedException ignore) {
}
}

return registerBrokerResultList;
}

protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,
boolean checkOrderConfig) {
for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
break;
}
}
}
1.2.3.4 启动定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 1. 30s向namesrv注册一次
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

// 2.
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();
// 1s同步一次broker组信息
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
}
}
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

// 3. 1s发送一次心跳到namesrv
if (this.brokerConfig.isEnableControllerMode()) {
scheduleSendHeartbeat();
}

// 4. 无条件启动broker注册
if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}

// 5. 5s更新一次broker元数据
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) {
LOG.error("ScheduledTask refresh metadata exception", e);
}
}
}, 10, 5, TimeUnit.SECONDS);

2. 远程通讯服务

![image-20250413223231276](/Users/kakalong/Library/Application Support/typora-user-images/image-20250413223231276.png)

2.1

3.

4.


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.