1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable(); ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigMap.values()) { if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { topicConfigTable.put(topicConfig.getTopicName(), new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), topicConfig.getPerm() & getBrokerConfig().getBrokerPermission())); } else { topicConfigTable.put(topicConfig.getTopicName(), topicConfig); }
if (this.brokerConfig.isEnableSplitRegistration() && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) { TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable); doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); topicConfigTable.clear(); } }
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream() .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager(). buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap); if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isInBrokerContainer())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) {
if (shutdown) { BrokerController.LOG.info("BrokerController#doRegisterBrokerAll: broker has shutdown, no need to register any more."); return; } List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, Lists.newArrayList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isEnableSlaveActingMaster(), this.brokerConfig.isCompressedRegister(), this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null, this.getBrokerIdentity());
handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig); }
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean enableActingMaster, final boolean compressed, final Long heartbeatTimeoutMillis, final BrokerIdentity brokerIdentity) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setEnableActingMaster(enableActingMaster); requestHeader.setCompressed(false); if (heartbeatTimeoutMillis != null) { requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis); }
RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper)); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) { @Override public void run0() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); }
LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr); } catch (Exception e) { LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); }
try { if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) { LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills); } } catch (InterruptedException ignore) { } }
return registerBrokerResultList; }
protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList, boolean checkOrderConfig) { for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) { if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr()); }
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } break; } } }
|