1. 同步与异步 在 RocketMQ 的集群模式中,Broker 分为 Master 与 Slave,⼀个 Master 可以对应多个 Slave,但是⼀个 Slave 只能对应⼀个 Master。每个 Broker 与 Name Server 集群中的所有节点建⽴⻓连接,定时注册 Topic 信息到所有 Name Server。
Master 节点负责接收客户端的写⼊请求,并将消息持久化到磁盘上。⽽ Slave 节点则负责从 Master 节点复制消息数据,并保持与 Master 节点的同步。
1.1 同步复制
每个 Master 配置⼀个 Slave ,有多对 Master-Slave ,HA 采⽤同步双写⽅式,即只有主备都写成功,才向应⽤返回成功。
这种模式的优缺点如下:
优点:数据与服务都⽆单点故障,Master宕机情况下,消息⽆延迟,服务可⽤性与数据可⽤性都⾮常⾼;
缺点:性能⽐异步复制模式略低(⼤约低10%左右),发送单个消息的 RT 会略⾼,且⽬前版本在主节点宕机后,备机不能⾃动切换为主机。
1.2 异步复制
每个 Master 配置⼀个 Slave ,有多对 Master-Slave ,HA 采⽤异步复制⽅式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
优点:即使磁盘损坏,消息丢失的⾮常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,⽽且此过程对应⽤透明,不需要⼈⼯⼲预,性能同多 Master 模式⼏乎⼀样;
缺点:Master 宕机,磁盘损坏情况下会丢失少量消息 。
复制流程分为两个部分:元数据复制 和消息数据复制 。
主从服务器同步主题,消费者进度,延迟消费进度,消费者配置数据
主从服务器同步消息数据
2. 元数据复制 Slave Broker 定时任务每隔 10 秒会同步元数据,包括主题 ,消费进度 ,延迟消费进度 ,消费者配置 。
1 2 3 4 5 6 7 8 9 10 public void syncAll () { this .syncTopicConfig(); this .syncConsumerOffset(); this .syncDelayOffset(); this .syncSubscriptionGroupConfig(); }
同步主题时, Slave Broker 向 Master Broker 发送 RPC 请求,返回数据后,⾸先加⼊本地缓存⾥,然后持久化到本地。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void syncTopicConfig () { String masterAddrBak = this .masterAddr; if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { TopicConfigSerializeWrapper topicWrapper = this .brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); if (!this .brokerController.getTopicConfigManager().getDataVersion() .equals(topicWrapper.getDataVersion())) { this .brokerController.getTopicConfigManager().getDataVersion() .assignNewOne(topicWrapper.getDataVersion()); this .brokerController.getTopicConfigManager().getTopicConfigTable().clear(); this .brokerController.getTopicConfigManager().getTopicConfigTable() .putAll(topicWrapper.getTopicConfigTable()); this .brokerController.getTopicConfigManager().persist(); log.info("Update slave topic config from master, {}" , masterAddrBak); } } catch (Exception e) { log.error("SyncTopicConfig Exception, {}" , masterAddrBak, e); } } }
3. 消息数据复制 下图是 Master 和 Slave 消息数据同步的流程图。
3.1 Mster启动后监听指定端口 Master 启动后创建 AcceptSocketService 服务 , ⽤来创建客户端到服务端的 TCP 链接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 class AcceptSocketService extends ServiceThread { private final SocketAddress socketAddressListen; private ServerSocketChannel serverSocketChannel; private Selector selector; public AcceptSocketService (final int port) { this .socketAddressListen = new InetSocketAddress (port); } public void beginAccept () throws Exception { this .serverSocketChannel = ServerSocketChannel.open(); this .selector = RemotingUtil.openSelector(); this .serverSocketChannel.socket().setReuseAddress(true ); this .serverSocketChannel.socket().bind(this .socketAddressListen); this .serverSocketChannel.configureBlocking(false ); this .serverSocketChannel.register(this .selector, SelectionKey.OP_ACCEPT); } @Override public void shutdown (final boolean interrupt) { super .shutdown(interrupt); try { this .serverSocketChannel.close(); this .selector.close(); } catch (IOException e) { log.error("AcceptSocketService shutdown exception" , e); } } @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { this .selector.select(1000 ); Set<SelectionKey> selected = this .selector.selectedKeys(); if (selected != null ) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0 ) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null ) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = new HAConnection (HAService.this , sc); conn.start(); HAService.this .addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception" , e); sc.close(); } } } else { log.warn("Unexpected ops in select " + k.readyOps()); } } selected.clear(); } } catch (Exception e) { log.error(this .getServiceName() + " service has exception." , e); } } log.info(this .getServiceName() + " service end" ); } @Override public String getServiceName () { return AcceptSocketService.class.getSimpleName(); } }
RocketMQ 抽象了链接对象 HAConnection , HAConnection 会启动两个线程,分别⽤于读服务和写服务:
读服务:处理 Slave 发送的请求
写服务:⽤于向 Slave 传输数据
3.2 Slave启动后,尝试连接Master,建立TCP连接 HAClient 是客户端 Slave 的核⼼类 ,负责和 Master 创建连接和数据交互。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private boolean connectMaster () throws ClosedChannelException { if (null == socketChannel) { String addr = this .masterAddress.get(); if (addr != null ) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null ) { this .socketChannel = RemotingUtil.connect(socketAddress); if (this .socketChannel != null ) { this .socketChannel.register(this .selector, SelectionKey.OP_READ); } } } this .currentReportedOffset = HAService.this .defaultMessageStore.getMaxPhyOffset(); this .lastWriteTimestamp = System.currentTimeMillis(); } return this .socketChannel != null ; }
客户端在启动后,⾸先尝试连接 Master , 查询当前消息存储中最⼤的物理偏移量 ,并存储在变量currentReportedOffset ⾥。
3.3 Slave向Mater汇报拉取消息偏移量 1 2 3 4 5 6 if (this .isTimeToReportOffset()) { boolean result = this .reportSlaveMaxOffset(this .currentReportedOffset); if (!result) { this .closeMaster(); } }
上报进度的数据格式是⼀个 Long 类型的 Offset , 8个字节 , ⾮常简洁 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private boolean reportSlaveMaxOffset (final long maxOffset) { this .reportOffset.position(0 ); this .reportOffset.limit(8 ); this .reportOffset.putLong(maxOffset); this .reportOffset.position(0 ); this .reportOffset.limit(8 ); for (int i = 0 ; i < 3 && this .reportOffset.hasRemaining(); i++) { try { this .socketChannel.write(this .reportOffset); } catch (IOException e) { log.error(this .getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception" , e); return false ; } } lastWriteTimestamp = HAService.this .defaultMessageStore.getSystemClock().now(); return !this .reportOffset.hasRemaining(); }
发送到 Socket 缓冲区后 , 修改最后⼀次的写时间 lastWriteTimestamp 。
3.4 Master解析请求偏移量,从消息文件中检索该偏移量后的所有消息 当 Slave 上报数据到 Master 时,触发 SelectionKey.OP_READ 事件 ,Master 将请求交由 ReadSocketService 服务处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public void run () { HAConnection.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { this .selector.select(1000 ); boolean ok = this .processReadEvent(); if (!ok) { HAConnection.log.error("processReadEvent error" ); break ; } long interval = HAConnection.this .haService.getDefaultMessageStore().getSystemClock().now() - this .lastReadTimestamp; if (interval > HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("ha housekeeping, found this connection[" + HAConnection.this .clientAddr + "] expired, " + interval); break ; } } catch (Exception e) { HAConnection.log.error(this .getServiceName() + " service has exception." , e); break ; } } this .makeStop(); writeSocketService.makeStop(); haService.removeConnection(HAConnection.this ); HAConnection.this .haService.getConnectionCount().decrementAndGet(); SelectionKey sk = this .socketChannel.keyFor(this .selector); if (sk != null ) { sk.cancel(); } try { this .selector.close(); this .socketChannel.close(); } catch (IOException e) { HAConnection.log.error("" , e); } HAConnection.log.info(this .getServiceName() + " service end" ); }
当 Slave Broker 传递了⾃身 commitlog 的 maxPhyOffset 时,Master 会⻢上中断 selector.select(1000) ,执⾏processReadEvent ⽅法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private boolean processReadEvent () { int readSizeZeroTimes = 0 ; if (!this .byteBufferRead.hasRemaining()) { this .byteBufferRead.flip(); this .processPosition = 0 ; } while (this .byteBufferRead.hasRemaining()) { try { int readSize = this .socketChannel.read(this .byteBufferRead); if (readSize > 0 ) { readSizeZeroTimes = 0 ; this .lastReadTimestamp = HAConnection.this .haService.getDefaultMessageStore().getSystemClock().now(); if ((this .byteBufferRead.position() - this .processPosition) >= 8 ) { int pos = this .byteBufferRead.position() - (this .byteBufferRead.position() % 8 ); long readOffset = this .byteBufferRead.getLong(pos - 8 ); this .processPosition = pos; HAConnection.this .slaveAckOffset = readOffset; if (HAConnection.this .slaveRequestOffset < 0 ) { HAConnection.this .slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this .clientAddr + "] request offset " + readOffset); } else if (HAConnection.this .slaveAckOffset > HAConnection.this .haService.getDefaultMessageStore().getMaxPhyOffset()) { log.warn("slave[{}] request offset={} greater than local commitLog offset={}. " , HAConnection.this .clientAddr, HAConnection.this .slaveAckOffset, HAConnection.this .haService.getDefaultMessageStore().getMaxPhyOffset()); return false ; } HAConnection.this .haService.notifyTransferSome(HAConnection.this .slaveAckOffset); } } else if (readSize == 0 ) { if (++readSizeZeroTimes >= 3 ) { break ; } } else { log.error("read socket[" + HAConnection.this .clientAddr + "] < 0" ); return false ; } } catch (IOException e) { log.error("processReadEvent exception" , e); return false ; } } return true ; }
processReadEvent ⽅法的核⼼逻辑是设置 Slave 的当前进度 offset ,然后通知复制线程当前的复制进度。
写服务 WriteSocketService 从消息⽂件中检索该偏移量后的所有消息(传输批次数据⼤⼩限制),并将消息数据发送给Slave。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 public void run () { HAConnection.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { this .selector.select(1000 ); if (-1 == HAConnection.this .slaveRequestOffset) { Thread.sleep(10 ); continue ; } if (-1 == this .nextTransferFromWhere) { if (0 == HAConnection.this .slaveRequestOffset) { long masterOffset = HAConnection.this .haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = masterOffset - (masterOffset % HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig() .getMappedFileSizeCommitLog()); if (masterOffset < 0 ) { masterOffset = 0 ; } this .nextTransferFromWhere = masterOffset; } else { this .nextTransferFromWhere = HAConnection.this .slaveRequestOffset; } log.info("master transfer data from " + this .nextTransferFromWhere + " to slave[" + HAConnection.this .clientAddr + "], and slave request " + HAConnection.this .slaveRequestOffset); } if (this .lastWriteOver) { long interval = HAConnection.this .haService.getDefaultMessageStore().getSystemClock().now() - this .lastWriteTimestamp; if (interval > HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig() .getHaSendHeartbeatInterval()) { this .byteBufferHeader.position(0 ); this .byteBufferHeader.limit(headerSize); this .byteBufferHeader.putLong(this .nextTransferFromWhere); this .byteBufferHeader.putInt(0 ); this .byteBufferHeader.flip(); this .lastWriteOver = this .transferData(); if (!this .lastWriteOver) continue ; } } else { this .lastWriteOver = this .transferData(); if (!this .lastWriteOver) continue ; } SelectMappedBufferResult selectResult = HAConnection.this .haService.getDefaultMessageStore().getCommitLogData(this .nextTransferFromWhere); if (selectResult != null ) { int size = selectResult.getSize(); if (size > HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this .nextTransferFromWhere; this .nextTransferFromWhere += size; selectResult.getByteBuffer().limit(size); this .selectMappedBufferResult = selectResult; this .byteBufferHeader.position(0 ); this .byteBufferHeader.limit(headerSize); this .byteBufferHeader.putLong(thisOffset); this .byteBufferHeader.putInt(size); this .byteBufferHeader.flip(); this .lastWriteOver = this .transferData(); } else { HAConnection.this .haService.getWaitNotifyObject().allWaitForRunning(100 ); } } catch (Exception e) { HAConnection.log.error(this .getServiceName() + " service has exception." , e); break ; } } HAConnection.this .haService.getWaitNotifyObject().removeFromWaitingThreadTable(); if (this .selectMappedBufferResult != null ) { this .selectMappedBufferResult.release(); } this .makeStop(); readSocketService.makeStop(); haService.removeConnection(HAConnection.this ); SelectionKey sk = this .socketChannel.keyFor(this .selector); if (sk != null ) { sk.cancel(); } try { this .selector.close(); this .socketChannel.close(); } catch (IOException e) { HAConnection.log.error("" , e); } HAConnection.log.info(this .getServiceName() + " service end" ); }
3.5 Slave接收到数据,将消息数据append到消息文件commitlog里 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private boolean dispatchReadRequest () { final int msgHeaderSize = 8 + 4 ; while (true ) { int diff = this .byteBufferRead.position() - this .dispatchPosition; if (diff >= msgHeaderSize) { long masterPhyOffset = this .byteBufferRead.getLong(this .dispatchPosition); int bodySize = this .byteBufferRead.getInt(this .dispatchPosition + 8 ); long slavePhyOffset = HAService.this .defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0 ) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false ; } } if (diff >= (msgHeaderSize + bodySize)) { byte [] bodyData = byteBufferRead.array(); int dataStart = this .dispatchPosition + msgHeaderSize; HAService.this .defaultMessageStore.appendToCommitLog( masterPhyOffset, bodyData, dataStart, bodySize); this .dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false ; } continue ; } } if (!this .byteBufferRead.hasRemaining()) { this .reallocateByteBuffer(); } break ; } return true ; }
⾸先 HAClient 类中调⽤ dispatchReadRequest ⽅法 , 解析出消息数据 ;
然后将消息数据 append 到本地的消息存储。
1 2 3 4 5 6 7 8 9 10 11 byte [] bodyData = byteBufferRead.array();int dataStart = this .dispatchPosition + msgHeaderSize;HAService.this .defaultMessageStore.appendToCommitLog( masterPhyOffset, bodyData, dataStart, bodySize); this .dispatchPosition += msgHeaderSize + bodySize;if (!reportSlaveMaxOffsetPlus()) { return false ; }
4. 同步的实现 从数据复制流程图,我们发觉数据复制本身就是⼀个异步执⾏的,但是同步是如何实现的呢?
Master Broker 接收到写⼊消息的请求后 ,调⽤ Commitlog 的 aysncPutMessage ⽅法写⼊消息。
1 2 3 4 5 6 7 8 9 10 11 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; });
这段代码中,当 commitLog 执⾏完 appendMessage 后, 需要执⾏刷盘任务 和同步复制 两个任务。
但这两个任务并不是同步执⾏,⽽是异步的⽅式,使⽤了 CompletableFuture 这个异步神器 。
当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void doWaitTransfer () { if (!this .requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this .requestsRead) { boolean transferOK = HAService.this .push2SlaveMaxOffset.get() >= req.getNextOffset(); long deadLine = req.getDeadLine(); while (!transferOK && deadLine - System.nanoTime() > 0 ) { this .notifyTransferObject.waitForRunning(1000 ); transferOK = HAService.this .push2SlaveMaxOffset.get() >= req.getNextOffset(); } req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } this .requestsRead = new LinkedList <>(); } }
最后 Broker 组装响应命令 ,并将响应命令返回给客户端。
5. 总结 RocketMQ 主从复制的实现思路⾮常简单,Slave 启动⼀个线程,不断从 Master 拉取 Commit Log 中的数据,然后在异
步 build 出 Consume Queue 数据结构。
核⼼要点如下:
1 、主从复制包含元数据复制和消息数据复制两个部分;
2 、元数据复制
Slave Broker 定时任务每隔 10 秒向 Master Broker 发送 RPC 请求,将元数据同步到缓存后,然后持久化到磁盘⾥;
3 、消息数据复制
Master 启动监听指定端⼝
Slave 启动 HaClient 服务,和 Master 创建 TCP 链接
Slave 向 Master 上报存储进度
Master 接收进度,消息⽂件中检索该偏移量后的所有消息,并传输给 Slave
Slave 接收到数据后,将消息数据 append 到本地的消息存储。
4 、同步的实现
当 commitLog 执⾏完 appendMessage 后, 需要执⾏刷盘任务 和同步复制 两个任务,这⾥⽤到了CompletableFuture 这个异步神器。
当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。最后 Broker 组装响应命令 ,并将响应命令 返回给客户端 。