1. 订阅关系演示

保证订阅关系一致:同⼀个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic 、Tag 必须完全⼀致。

如果订阅关系不⼀致,消息消费的逻辑就会混乱,甚⾄导致消息丢失。

1.1 正确订阅关系

image-20240505155100223

正确的订阅关系:多个 Group ID 订阅了多个 Topic,并且每个 Group ID ⾥的多个消费者的订阅关系保持了⼀致。

1.2 错误订阅关系

image-20240505155156387

单个 Group ID 订阅了多个 Topic,但是该 Group ID ⾥的多个消费者的订阅关系并没有保持⼀致。

总结:每个消费者实例内订阅⽅法的主题、 TAG、监听逻辑都需要保持⼀致。

image-20240505155600353

接下来,我们实验相同消费组,两种不正确的场景,看看消费者和 Broker 服务有什么异常。

  • 订阅主题不同,标签相同

  • 订阅主题相同,标签不同

2. 订阅主题不同,标签相同

image-20240505162718779

当我们启动两个消费者后,消费者组名: myconsumerGroup 。C1消费者订阅主题 TopicTest , C2消费者订阅主题 mytest 。

在 Broker 端的⽇志⾥,会不停的打印拉取消息失败的⽇志 :

1
2
2023-10-09 14:52:53 WARN PullMessageThread_2 -
the consumer's subscription not exist, group: myconsumerGroup, topic:TopicTest

那么在这种情况下,C1 消费者是不可能拉取到消息,也就不可能消费到最新的消息。

为什么呢 ? 我们知道客户端会定时的发送⼼跳包到 Broker 服务,⼼跳包中会包含消费者订阅信息,数据格式样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"subscriptionDataSet": [
{
"classFilterMode": false,
"codeSet": [],
"expressionType": "TAG",
"subString": "*",
"subVersion": 1696832107020,
"tagsSet": [],
"topic": "TopicTest"
},
{
"classFilterMode": false,
"codeSet": [],
"expressionType": "TAG",
"subString": "*",
"subVersion": 1696832098221,
"tagsSet": [],
"topic": "%RETRY%myconsumerGroup"
}
]

Broker 服务会调⽤ ClientManageProcessor 的 heartBeat ⽅法处理⼼跳请求。

最终跟踪到代码: org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

image-20240505163322771

Broker 服务的会保存消费者信息,消费者信息存储在消费者表 consumerTable 。消费者表以消费组名为 key , 值为消费者组信息 ConsumerGroupInfo 。

1
2
3
#org.apache.rocketmq.broker.client.ConsumerManager
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

如果消费组的消费者信息 ConsumerGroupInfo 为空,则新建新的对象。

更新订阅信息时,订阅信息是按照消费组存放的,这步骤就会导致同⼀个消费组内的各个消费者客户端的订阅信息相互被覆盖。

回到消费者客户端,当消费者拉取消息时,Broker 服务会调⽤ PullMessageProcessor 的 processRequest ⽅法 。

⾸先会进⾏前置判断,查询当前的主题的订阅信息若该主题的订阅信息为空,则打印告警⽇志,并返回异常的响应结果。

1
2
3
4
5
6
7
8
9
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); 
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}",
requestHeader.getConsumerGroup(),
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" +
FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}

通过调研 Broker 端的代码,我们发现:相同消费组的订阅信息必须保持⼀致 , 否则同⼀个消费组内的各个消费者客户端的订阅信息相互被覆盖,从⽽导致某个消费者客户端⽆法拉取到新的消息。

C1消费者⽆法消费主题 TopicTest 的消息数据,那么 C2 消费者订阅主题 mytest ,消费会正常吗 ?

image-20240505163802989

从上图来看,依然有问题。 主题 mytest 有四个队列,但只有两个队列被分配了, 另外两个队列的消息就没有办法消费了。

要解释这个问题,我们需要重新温习负载均衡的原理。

负载均衡服务会根据消费模式为”⼴播模式”还是“集群模式”做不同的逻辑处理,这⾥主要来看下集群模式下的主要处理流程:

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后⽤消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;

image-20240505164547955

这⾥的平均分配算法,类似于分⻚的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似⻚数,并求出每⼀⻚需要包含的平均 size 和每个⻚⾯记录的范围 range ,最后遍历整个 range ⽽计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做⼀个过滤⽐对操作。

image-20240504231355966

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的处理队列 processQueue (消费快照)。

  1. 标红的 Entry 部分表示与分配到的消息队列集合互不包含,则需要将这些红⾊队列 Dropped 属性为 true , 然后从processQueueTable 对象中移除。

  2. 绿⾊的 Entry 部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。

  3. ⻩⾊的 Entry 部分表示这些队列需要添加到 processQueueTable 对象中,为每个分配的新队列创建⼀个消息拉取请求 pullRequest , 在消息拉取请求中保存⼀个处理队列 processQueue (队列消费快照),内部是红⿊树( TreeMap ),⽤来保存拉取到的消息。

最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进⼊拉取消息环节

通过上⾯的介绍 ,通过负载均衡的原理推导,原因就显⽽易⻅了。

image-20240505164337295

C1消费者被分配了队列 0、队列 1 ,但是 C1消费者本身并没有订阅主题 mytest , 所以⽆法消费该主题的数据。

从本次实验来看,C1消费者⽆法消费主题 TopicTest 的消息数据 , C2 消费者只能部分消费主题 mytest 的消息数据。

但是因为在 Broker 端,同⼀个消费组内的各个消费者客户端的订阅信息相互被覆盖,所以这种消费状态⾮常混乱,偶尔也会切换成:C1消费者可以部分消费主题 TopicTest 的消息数据 , C2消费者⽆法消费主题 mytest 的消息数据。

3. 订阅主题相同,标签不同

image-20240505164659966

如图,C1 消费者和 C2 消费者订阅主题 TopicTest ,但两者的标签 TAG 并不相同。

启动消费者服务之后,从控制台观察,负载均衡的效果也如预期⼀般正常。

image-20240505164801487

笔者在 Broker 端打印埋点⽇志,发现主题 TopicTest 的订阅信息为 :

1
2
3
4
5
6
7
8
9
{
"classFilterMode": false,
"codeSet": [66],
"expressionType": "TAG",
"subString": "B",
"subVersion": 1696901014319,
"tagsSet": ["B"],
"topic": "TopicTest"
}

那么这种状态,消费正常吗 ?笔者做了⼀组实验,消费依然混乱:

C1 消费者⽆法消费 TAG 值为 A **的消息 ,C2 消费者只能消费部分TAG 值为 B 的消息。

想要理解原因,我们需要梳理消息过滤机制。

⾸先 ConsumeQueue ⽂件的格式如下 :

image-20240505165122090

  1. Broker 端在接收到拉取请求后,根据请求参数定位 ConsumeQueue ⽂件,然后遍历 ConsumeQueue 待检索的条⽬, 判断条⽬中存储 Tag 的 hashcode 是否和订阅信息中 TAG 的 hashcode 是否相同,若不符合,则跳过,继续对⽐下⼀个, 符合条件的聚合后返回给消费者客户端。

  2. 消费者在收到过滤后的消息后,也要执⾏过滤机制,只不过过滤的是 TAG 字符串的值,⽽不是 hashcode 。

我们模拟下消息过滤的过程:

image-20240505165320735

⾸先,⽣产者将不同的消息发送到 Broker 端,不同的 TAG 的消息会发送到保存的不同的队列中。C1 消费者从队列 0 ,队列 1 中拉取消息时,因为 Broker 端该主题的订阅信息中 TAG 值为 B ,经过服务端过滤后, C1消费者拉取到的消息的 TAG 值都是 B , 但消费者在收到过滤的消息后,也需要进⾏客户端过滤,A 并不等于 B ,所以 C1消费者⽆法消费 TAG 值为 A 的消息。

C2 消费者从队列 2, 队列 3 中拉取消息,整个逻辑链路是正常的 ,但是因为负载均衡的缘故,它⽆法消费队列 0 ,队列 1的消息。

4. 总结

什么是消费组 ?消费同⼀类消息且消费逻辑⼀致 。

RocketMQ 4.X 源码实现就是为了和消费组的定义保持⼀致 ,假如订阅关系不⼀致,那么代码执⾏逻辑就会出现混乱。

规避订阅关系不⼀致这个问题有两种⽅式:

  • 合理定义好主题和标签

当我们定义好主题和标签后,需要添加新的标签时,是否可以换⼀个思路:换⼀个新的消费组或者新建⼀个主题。

  • 严格规范上线流程

在上线之前,梳理好相关依赖服务,梳理好上线流程,做好上线评审,并严格按照流程执⾏。

最后的思考:

假如从基础架构层⾯来思考,将订阅关系信息中⼼化来设计,应该也可以实现 ,但成本较⾼,对于中⼩企业来讲,并不合算。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.