1. 同步与异步

在 RocketMQ 的集群模式中,Broker 分为 Master 与 Slave,⼀个 Master 可以对应多个 Slave,但是⼀个 Slave 只能对应⼀个 Master。

每个 Broker 与 Name Server 集群中的所有节点建⽴⻓连接,定时注册 Topic 信息到所有 Name Server。

image-20240505182927387

Master 节点负责接收客户端的写⼊请求,并将消息持久化到磁盘上。⽽ Slave 节点则负责从 Master 节点复制消息数据,并保持与 Master 节点的同步。

1.1 同步复制

image-20240505183027152

每个 Master 配置⼀个 Slave ,有多对 Master-Slave ,HA 采⽤同步双写⽅式,即只有主备都写成功,才向应⽤返回成功。

这种模式的优缺点如下:

  • 优点:数据与服务都⽆单点故障,Master宕机情况下,消息⽆延迟,服务可⽤性与数据可⽤性都⾮常⾼;
  • 缺点:性能⽐异步复制模式略低(⼤约低10%左右),发送单个消息的 RT 会略⾼,且⽬前版本在主节点宕机后,备机不能⾃动切换为主机。

1.2 异步复制

image-20240505183251376

每个 Master 配置⼀个 Slave ,有多对 Master-Slave ,HA 采⽤异步复制⽅式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的⾮常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,⽽且此过程对应⽤透明,不需要⼈⼯⼲预,性能同多 Master 模式⼏乎⼀样;
  • 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息 。

复制流程分为两个部分:元数据复制消息数据复制

  • 主从服务器同步主题,消费者进度,延迟消费进度,消费者配置数据
  • 主从服务器同步消息数据

2. 元数据复制

Slave Broker 定时任务每隔 10 秒会同步元数据,包括主题消费进度延迟消费进度消费者配置

1
2
3
4
5
6
7
8
9
10
public void syncAll() {
// 同步主题
this.syncTopicConfig();
// 同步消费者进度文件
this.syncConsumerOffset();
// 延迟消息进度
this.syncDelayOffset();
// 消费组配置
this.syncSubscriptionGroupConfig();
}

同步主题时, Slave Broker 向 Master Broker 发送 RPC 请求,返回数据后,⾸先加⼊本地缓存⾥,然后持久化到本地。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {

this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();

log.info("Update slave topic config from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}

3. 消息数据复制

下图是 Master 和 Slave 消息数据同步的流程图。

3.1 Mster启动后监听指定端口

Master 启动后创建 AcceptSocketService 服务 , ⽤来创建客户端到服务端的 TCP 链接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class AcceptSocketService extends ServiceThread {
private final SocketAddress socketAddressListen;
private ServerSocketChannel serverSocketChannel;
private Selector selector;

public AcceptSocketService(final int port) {
this.socketAddressListen = new InetSocketAddress(port);
}

/**
* Starts listening to slave connections.
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

/**
* {@inheritDoc}
*/
@Override
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
try {
this.serverSocketChannel.close();
this.selector.close();
} catch (IOException e) {
log.error("AcceptSocketService shutdown exception", e);
}
}

/**
* {@inheritDoc}
*/
@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();

if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());

try {
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}

selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}

log.info(this.getServiceName() + " service end");
}

/**
* {@inheritDoc}
*/
@Override
public String getServiceName() {
return AcceptSocketService.class.getSimpleName();
}
}

RocketMQ 抽象了链接对象 HAConnection , HAConnection 会启动两个线程,分别⽤于读服务和写服务:

  • 读服务:处理 Slave 发送的请求
  • 写服务:⽤于向 Slave 传输数据

3.2 Slave启动后,尝试连接Master,建立TCP连接

HAClient 是客户端 Slave 的核⼼类 ,负责和 Master 创建连接和数据交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {

SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}

this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

this.lastWriteTimestamp = System.currentTimeMillis();
}

return this.socketChannel != null;
}

客户端在启动后,⾸先尝试连接 Master , 查询当前消息存储中最⼤的物理偏移量 ,并存储在变量currentReportedOffset ⾥。

3.3 Slave向Mater汇报拉取消息偏移量

1
2
3
4
5
6
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}

上报进度的数据格式是⼀个 Long 类型的 Offset , 8个字节 , ⾮常简洁 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);

for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}

lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}

发送到 Socket 缓冲区后 , 修改最后⼀次的写时间 lastWriteTimestamp 。

3.4 Master解析请求偏移量,从消息文件中检索该偏移量后的所有消息

当 Slave 上报数据到 Master 时,触发 SelectionKey.OP_READ 事件,Master 将请求交由 ReadSocketService 服务处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}

long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}

this.makeStop();

writeSocketService.makeStop();

haService.removeConnection(HAConnection.this);

HAConnection.this.haService.getConnectionCount().decrementAndGet();

SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}

try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}

HAConnection.log.info(this.getServiceName() + " service end");
}

当 Slave Broker 传递了⾃身 commitlog 的 maxPhyOffset 时,Master 会⻢上中断 selector.select(1000) ,执⾏processReadEvent ⽅法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private boolean processReadEvent() {
int readSizeZeroTimes = 0;

if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}

while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;

HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
} else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
HAConnection.this.clientAddr,
HAConnection.this.slaveAckOffset,
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
return false;
}

HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}

return true;
}

processReadEvent ⽅法的核⼼逻辑是设置 Slave 的当前进度 offset ,然后通知复制线程当前的复制进度。

写服务 WriteSocketService 从消息⽂件中检索该偏移量后的所有消息(传输批次数据⼤⼩限制),并将消息数据发送给Slave。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.selector.select(1000);

if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}

if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());

if (masterOffset < 0) {
masterOffset = 0;
}

this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}

log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}

if (this.lastWriteOver) {

long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {

// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();

this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}

SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}

long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;

selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;

// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();

this.lastWriteOver = this.transferData();
} else {

HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {

HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}

HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}

this.makeStop();

readSocketService.makeStop();

haService.removeConnection(HAConnection.this);

SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}

try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}

HAConnection.log.info(this.getServiceName() + " service end");
}

3.5 Slave接收到数据,将消息数据append到消息文件commitlog里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size

while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= msgHeaderSize) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}

if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = byteBufferRead.array();
int dataStart = this.dispatchPosition + msgHeaderSize;

HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);

this.dispatchPosition += msgHeaderSize + bodySize;

if (!reportSlaveMaxOffsetPlus()) {
return false;
}

continue;
}
}

if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}

break;
}

return true;
}

⾸先 HAClient 类中调⽤ dispatchReadRequest ⽅法 , 解析出消息数据 ;

然后将消息数据 append 到本地的消息存储。

1
2
3
4
5
6
7
8
9
10
11
byte[] bodyData = byteBufferRead.array();
int dataStart = this.dispatchPosition + msgHeaderSize;

HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);

this.dispatchPosition += msgHeaderSize + bodySize;

if (!reportSlaveMaxOffsetPlus()) {
return false;
}

4. 同步的实现

从数据复制流程图,我们发觉数据复制本身就是⼀个异步执⾏的,但是同步是如何实现的呢?

Master Broker 接收到写⼊消息的请求后 ,调⽤ Commitlog 的 aysncPutMessage ⽅法写⼊消息。

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});

这段代码中,当 commitLog 执⾏完 appendMessage 后, 需要执⾏刷盘任务同步复制两个任务。

但这两个任务并不是同步执⾏,⽽是异步的⽅式,使⽤了 CompletableFuture 这个异步神器

当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long deadLine = req.getDeadLine();
while (!transferOK && deadLine - System.nanoTime() > 0) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}

this.requestsRead = new LinkedList<>();
}
}

最后 Broker 组装响应命令 ,并将响应命令返回给客户端。

5. 总结

RocketMQ 主从复制的实现思路⾮常简单,Slave 启动⼀个线程,不断从 Master 拉取 Commit Log 中的数据,然后在异

步 build 出 Consume Queue 数据结构。

核⼼要点如下:

1、主从复制包含元数据复制和消息数据复制两个部分;

2、元数据复制

Slave Broker 定时任务每隔 10 秒向 Master Broker 发送 RPC 请求,将元数据同步到缓存后,然后持久化到磁盘⾥;

3、消息数据复制

  1. Master 启动监听指定端⼝

  2. Slave 启动 HaClient 服务,和 Master 创建 TCP 链接

  3. Slave 向 Master 上报存储进度

  4. Master 接收进度,消息⽂件中检索该偏移量后的所有消息,并传输给 Slave

  5. Slave 接收到数据后,将消息数据 append 到本地的消息存储。

4、同步的实现

当 commitLog 执⾏完 appendMessage 后, 需要执⾏刷盘任务同步复制两个任务,这⾥⽤到了CompletableFuture 这个异步神器。

当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。最后 Broker 组装响应命令 ,并将响应命令 返回给客户端 。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.