1. 基础概念

RocketMQ ⽀持两种消息模式: 集群消费 ( Clustering )和 ⼴播消费 ( Broadcasting )。

集群消费

同⼀ Topic 下的⼀条消息只会被同⼀消费组中的⼀个消费者消费。也就是说,消息被负载均衡到了同⼀个消费组的多个消费者实例上。

⼴播消费

当使⽤⼴播消费模式时,每条消息推送给集群内所有的消费者,保证消息⾄少被每个消费者消费⼀次。

2. 源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class PushConsumer {
public static final String CONSUMER_GROUP = "myconsumerGroup";
public static final String DEFAULT_NAMESRVADDR = "localhost:9876";
public static final String TOPIC = "mytest";
public static final String SUB_EXPRESSION = "TagA || TagC || TagD";
public static void main(String[] args) throws InterruptedException, MQClientException {
// 定义 DefaultPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 定义名字服务地址
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 定义消费读取位点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 定义消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅主题信息
consumer.subscribe(TOPIC, SUB_EXPRESSION);
// 订阅消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
}catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}

和集群消费不同的点在于下⾯的代码:

1
consumer.setMessageModel(MessageModel.BROADCASTING);

接下来,我们从源码⻆度来看看⼴播消费和集群消费有哪些差异点 ?

⾸先进⼊ DefaultMQPushConsumerImpl 类的 start ⽅法 , 分析启动流程中他们两者的差异点:

差异点1:拷⻉订阅关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}

if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

在集群模式下,会⾃动订阅重试队列,⽽⼴播模式下,并没有这段代码。也就是说⼴播模式下,不⽀持消息重试

差异点2:本地进度存储

1
2
3
4
5
6
7
8
9
10
11
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

我们可以看到消费进度存储的对象是: LocalFileOffsetStore , 进度⽂件存储在如下的主⽬录 /{⽤户主目录}/.rocketmq_offsets 。

1
2
3
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

进度⽂件是 /mqClientId/{consumerGroupName}/offsets.json 。

1
2
3
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator +
"offsets.json";

笔者创建了⼀个主题 mytest , 包含4个队列,进度⽂件内容如下

image-20240506215526195

消费者启动后,我们可以将整个流程简化如下图,并继续整理差异点:

image-20240506215612544

差异点3:改在均衡该主题的所有的MessageQueue

进⼊负载均衡抽象类 RebalanceImpl 的 rebalanceByTopic ⽅法 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

Collections.sort(mqAll);
Collections.sort(cidAll);

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

从上⾯代码我们可以看到消息模式为⼴播消费模式时,消费者会消费该主题下所有的队列,这⼀点也可以从本地的进度⽂件 offsets.json 得到印证。

差异点4:不支持顺序消息

我们知道消费消息顺序服务会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔 20 秒会重新尝试。

1
2
3
4
5
6
7
8
9
10
11
12
13
if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}

但是从上⾯的代码,我们发现只有在集群消费的时候才会定时申请锁,这样就会导致⼴播消费时,⽆法为负载均衡的队列申请锁,导致拉取消息服务⼀直⽆法获取消息数据。

笔者修改消费例⼦,在消息模式为⼴播模式的场景下,将消费模式从并发消费修改为顺序消费。

1
2
3
4
5
6
7
8
9
10
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
}catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
});

因此,⼴播消费模式并不⽀持顺序消息

差异点5:并发消息失败时,没有重试

进⼊并发消息消费类 ConsumeMessageConcurrentlyService 的处理消费结果⽅法 processConsumeResult 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}",
msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue());
}
break;
default:
break;
}

消费消息失败后,集群消费时,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker端。

但在⼴播模式下,仅仅是打印了消息信息。因此,⼴播模式下,并没有消息重试

3. 实战案例

⼴播消费主要⽤于两种场景:消息推送缓存同步

3.1 消息推送

笔者第⼀次接触⼴播消费的业务场景是神州专⻋司机端的消息推送。

⽤户下单之后,订单系统⽣成专⻋订单,派单系统会根据相关算法将订单派给某司机,司机端就会收到派单推送。

推送服务是⼀个 TCP 服务(⾃定义协议),同时也是⼀个消费者服务,消息模式是⼴播消费。

司机打开司机端 APP 后,APP 会通过负载均衡和推送服务创建⻓连接,推送服务会保存 TCP 连接引⽤ (⽐如司机编号和 TCP channel 的引⽤)。

派单服务是⽣产者,将派单数据发送到 MetaQ , 每个推送服务都会消费到该消息,推送服务判断本地内存中是否存在该司机的 TCP channel , 若存在,则通过 TCP 连接将数据推送给司机端。

肯定有同学会问:假如⽹络原因,推送失败怎么处理 ?有两个要点:

  1. 司机端 APP 定时主动拉取派单信息;

  2. 当推送服务没有收到司机端的 ACK 时 ,也会⼀定时限内再次推送,达到阈值后,不再推送。

3.2 缓存同步

4. 总结

集群消费和⼴播消费模式下,各功能的⽀持情况如下:

功能 集群消费 广播消费
顺序消息 支持 不支持
重置消费位点 支持 不支持
消息重试 支持 不支持
消费进度 服务端维护 客户端维护

⼴播消费主要⽤于两种场景:消息推送缓存同步


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.