1. 基础概念
RocketMQ ⽀持两种消息模式: 集群消费 ( Clustering )和 ⼴播消费 ( Broadcasting )。
集群消费:
同⼀ Topic 下的⼀条消息只会被同⼀消费组中的⼀个消费者消费。也就是说,消息被负载均衡到了同⼀个消费组的多个消费者实例上。
⼴播消费:
当使⽤⼴播消费模式时,每条消息推送给集群内所有的消费者,保证消息⾄少被每个消费者消费⼀次。
2. 源码解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class PushConsumer { public static final String CONSUMER_GROUP = "myconsumerGroup"; public static final String DEFAULT_NAMESRVADDR = "localhost:9876"; public static final String TOPIC = "mytest"; public static final String SUB_EXPRESSION = "TagA || TagC || TagD"; public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe(TOPIC, SUB_EXPRESSION); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } }catch (Exception e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }
|
和集群消费不同的点在于下⾯的代码:
1
| consumer.setMessageModel(MessageModel.BROADCASTING);
|
接下来,我们从源码⻆度来看看⼴播消费和集群消费有哪些差异点 ?
⾸先进⼊ DefaultMQPushConsumerImpl 类的 start ⽅法 , 分析启动流程中他们两者的差异点:
差异点1:拷⻉订阅关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| private void copySubscription() throws MQClientException { try { Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } }
if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); }
switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
|
在集群模式下,会⾃动订阅重试队列,⽽⼴播模式下,并没有这段代码。也就是说⼴播模式下,不⽀持消息重试。
差异点2:本地进度存储
1 2 3 4 5 6 7 8 9 10 11
| switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
|
我们可以看到消费进度存储的对象是: LocalFileOffsetStore , 进度⽂件存储在如下的主⽬录 /{⽤户主目录}/.rocketmq_offsets 。
1 2 3
| public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
|
进度⽂件是 /mqClientId/{consumerGroupName}/offsets.json 。
1 2 3
| this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";
|
笔者创建了⼀个主题 mytest , 包含4个队列,进度⽂件内容如下
消费者启动后,我们可以将整个流程简化如下图,并继续整理差异点:
差异点3:改在均衡该主题的所有的MessageQueue
进⼊负载均衡抽象类 RebalanceImpl 的 rebalanceByTopic ⽅法 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } }
if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); }
if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet);
Collections.sort(mqAll); Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; }
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); }
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
|
从上⾯代码我们可以看到消息模式为⼴播消费模式时,消费者会消费该主题下所有的队列,这⼀点也可以从本地的进度⽂件 offsets.json 得到印证。
差异点4:不支持顺序消息
我们知道消费消息顺序服务会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔 20 秒会重新尝试。
1 2 3 4 5 6 7 8 9 10 11 12 13
| if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); }
|
但是从上⾯的代码,我们发现只有在集群消费的时候才会定时申请锁,这样就会导致⼴播消费时,⽆法为负载均衡的队列申请锁,导致拉取消息服务⼀直⽆法获取消息数据。
笔者修改消费例⼦,在消息模式为⼴播模式的场景下,将消费模式从并发消费修改为顺序消费。
1 2 3 4 5 6 7 8 9 10
| consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { try { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } }catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; });
|
因此,⼴播消费模式并不⽀持顺序消息。
差异点5:并发消息失败时,没有重试
进⼊并发消息消费类 ConsumeMessageConcurrentlyService 的处理消费结果⽅法 processConsumeResult 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt> (consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; }
|
消费消息失败后,集群消费时,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker端。
但在⼴播模式下,仅仅是打印了消息信息。因此,⼴播模式下,并没有消息重试。
3. 实战案例
⼴播消费主要⽤于两种场景:消息推送和缓存同步。
3.1 消息推送
笔者第⼀次接触⼴播消费的业务场景是神州专⻋司机端的消息推送。
⽤户下单之后,订单系统⽣成专⻋订单,派单系统会根据相关算法将订单派给某司机,司机端就会收到派单推送。
推送服务是⼀个 TCP 服务(⾃定义协议),同时也是⼀个消费者服务,消息模式是⼴播消费。
司机打开司机端 APP 后,APP 会通过负载均衡和推送服务创建⻓连接,推送服务会保存 TCP 连接引⽤ (⽐如司机编号和 TCP channel 的引⽤)。
派单服务是⽣产者,将派单数据发送到 MetaQ , 每个推送服务都会消费到该消息,推送服务判断本地内存中是否存在该司机的 TCP channel , 若存在,则通过 TCP 连接将数据推送给司机端。
肯定有同学会问:假如⽹络原因,推送失败怎么处理 ?有两个要点:
司机端 APP 定时主动拉取派单信息;
当推送服务没有收到司机端的 ACK 时 ,也会⼀定时限内再次推送,达到阈值后,不再推送。
3.2 缓存同步
4. 总结
集群消费和⼴播消费模式下,各功能的⽀持情况如下:
功能 |
集群消费 |
广播消费 |
顺序消息 |
支持 |
不支持 |
重置消费位点 |
支持 |
不支持 |
消息重试 |
支持 |
不支持 |
消费进度 |
服务端维护 |
客户端维护 |
⼴播消费主要⽤于两种场景:消息推送和缓存同步。