1. 基础概念 消息轨迹是指⼀条消息从⽣产者发送到 Broker , 再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚⽽成的完整链路信息。
当我们需要查询消息轨迹时,需要明⽩⼀点:消息轨迹数据是存储在 Broker 服务端,我们需要定义⼀个主题,在⽣产 者,消费者端定义轨迹钩⼦。
2. 开启轨迹 2.1 Broker配置文件
2.2 生产者配置 1 2 3 public DefaultMQProducer (final String producerGroup, boolean enableMsgTrace) public DefaultMQProducer (final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
在⽣产者的构造函数⾥,有两个核⼼参数:
执⾏如下的⽣产者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Producer { public static final String PRODUCER_GROUP = "mytestGroup" ; public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876" ; public static final String TOPIC = "example" ; public static final String TAG = "TagA" ; public static void main (String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer (PRODUCER_GROUP, true ); producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); producer.start(); try { String key = UUID.randomUUID().toString(); System.out.println(key); Message msg = new Message ( TOPIC, TAG, key, ("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } catch (Exception e) { e.printStackTrace(); } Thread.sleep(10000 ); producer.shutdown(); } }
在⽣产者代码中,我们指定了消息的 key 属性, 便于对于消息进⾏⾼性能检索。
执⾏成功之后,我们从控制台查看轨迹信息。
从图中可以看到,消息轨迹中存储了消息的 存储时间 、 存储服务器IP 、 发送耗时 。
2.3 消费者配置 和⽣产者类似,消费者的构造函数可以传递轨迹参数:
1 2 3 public DefaultMQPushConsumer (final String consumerGroup, boolean enableMsgTrace) ;public DefaultMQPushConsumer (final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) ;
执⾏如下的消费者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Consumer { public static final String CONSUMER_GROUP = "exampleGruop" ; public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876" ; public static final String TOPIC = "example" ; public static void main (String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (CONSUMER_GROUP , true ); consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(TOPIC, "*" ); consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msg); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n" ); } }
3. 实现原理 轨迹的实现原理主要是在⽣产者发送、消费者消费时添加相关的钩⼦。 因此,我们只需要了解钩⼦的实现逻辑即可。
下⾯的代码是 DefaultMQProducer 的构造函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public DefaultMQProducer (final String namespace, final String producerGroup, RPCHook rpcHook,boolean enableMsgTrace, final String customizedTraceTopic) { this .namespace = namespace; this .producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl (this , rpcHook); if (enableMsgTrace) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher (producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this .defaultMQProducerImpl); traceDispatcher = dispatcher; this .defaultMQProducerImpl.registerSendMessageHook( new SendMessageTraceHookImpl (traceDispatcher)); this .defaultMQProducerImpl.registerEndTransactionHook( new EndTransactionTraceHookImpl (traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data" ); } } }
当是否开启轨迹开关 打开时,创建异步轨迹分发器 AsyncTraceDispatcher ,然后给默认的⽣产者实现类在发送消息的钩⼦ SendMessageTraceHookImpl 。
1 2 3 this .defaultMQProducerImpl.registerSendMessageHook(new SendMessageTraceHookImpl (traceDispatcher));
我们把⽣产者发送消息的流程简化如下代码 :
1 2 3 4 5 6 this .executeSendMessageHookBefore(context);this .mQClientFactory.getMQClientAPIImpl().sendMessage(....)this .executeSendMessageHookAfter(context);
进⼊ SendMessageTraceHookImpl 类 ,该类主要有两个⽅法 sendMessageBefore 和 sendMessageAfter 。
1 、sendMessageBefore ⽅法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void sendMessageBefore (SendMessageContext context) { if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return ; } TraceContext tuxeContext = new TraceContext (); tuxeContext.setTraceBeans(new ArrayList <TraceBean>(1 )); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); TraceBean traceBean = new TraceBean (); traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); tuxeContext.getTraceBeans().add(traceBean); }
2 、sendMessageAfter ⽅法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void sendMessageAfter (SendMessageContext context) { if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null ) { return ; } if (context.getSendResult() == null ) { return ; } if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { return ; } TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0 ); int costTime = (int ) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); tuxeContext.setCostTime(costTime); if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true ); } else { tuxeContext.setSuccess(false ); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2 ); localDispatcher.append(tuxeContext); }
跟踪对象⾥会保存 costTime (消息发送时间)、 success (是否发送成功)、 regionId (发送到 Broker 所在的分区)、 msgId (消息 ID,全局唯⼀)、 offsetMsgId (消息物理偏移量) , storeTime (存储时间 ) 。
存储时间并没有取消息的实际存储时间,⽽是估算出来的:客户端发送时间的⼀般的耗时表示消息的存储时间。
最后将跟踪上下⽂添加到本地轨迹分发器:
1 localDispatcher.append(tuxeContext);
下⾯我们分析下轨迹分发器的原理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public AsyncTraceDispatcher (String group, Type type, String traceTopicName, RPCHook rpcHook) { this .queueSize = 2048 ; this .batchSize = 100 ; this .maxMsgSize = 128000 ; this .discardCount = new AtomicLong (0L ); this .traceContextQueue = new ArrayBlockingQueue <TraceContext>(1024 ); this .group = group; this .type = type; this .appenderQueue = new ArrayBlockingQueue <Runnable>(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this .traceTopicName = traceTopicName; } else { this .traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; } this .traceExecutor = new ThreadPoolExecutor ( 10 , 20 , 1000 * 60 , TimeUnit.MILLISECONDS, this .appenderQueue, new ThreadFactoryImpl ("MQTraceSendThread_" )); traceProducer = getAndCreateTraceProducer(rpcHook); } public void start (String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false , true )) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this .accessChannel = accessChannel; this .worker = new Thread (new AsyncRunnable (), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this .worker.setDaemon(true ); this .worker.start(); this .registerShutDownHook(); }
上⾯的代码展示了分发器的构造函数和启动⽅法,构造函数创建了⼀个发送消息的线程池 traceExecutor ,启动 start后会启动⼀个 worker线程 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class AsyncRunnable implements Runnable { private boolean stopped; @Override public void run () { while (!stopped) { List<TraceContext> contexts = new ArrayList <TraceContext>(batchSize); synchronized (traceContextQueue) { for (int i = 0 ; i < batchSize; i++) { TraceContext context = null ; try { context = traceContextQueue.poll(5 , TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } if (context != null ) { contexts.add(context); } else { break ; } } if (contexts.size() > 0 ) { AsyncAppenderRequest request = new AsyncAppenderRequest (contexts); traceExecutor.submit(request); } else if (AsyncTraceDispatcher.this .stopped) { this .stopped = true ; } } } } }
worker 启动后,会从轨迹上下⽂队列 traceContextQueue 中不断的取出轨迹上下⽂,并将上下⽂转换成轨迹数据⽚段 TraceDataSegment 。
为了提升系统的性能,并不是每⼀次从队列中获取到数据就直接发送到 MQ ,⽽是积累到⼀定程度的临界点才触发这个操作,我们可以简单的理解为批量操作 。
这⾥⾯有两个维度 :
轨迹数据⽚段的数据⼤⼩⼤于某个数据⼤⼩阈值。笔者认为这段 RocketMQ 4.9.4 版本代码存疑,因为最新的 5.0版本做了优化。
1 2 3 4 5 6 if (currentMsgSize >= traceProducer.getMaxMessageSize()) { List<TraceTransferBean> dataToSend = new ArrayList (traceTransferBeanList); AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask (traceTopicName,regionId,dataToSend); traceExecutor.submit(asyncDataSendTask); this .clear(); }
当前时间 - 轨迹数据⽚段的⾸次存储时间 是否⼤于刷新时间 ,也就是每500毫秒刷新⼀次。
1 2 3 4 5 6 7 8 private void sendDataByTimeThreshold () { long now = System.currentTimeMillis(); for (TraceDataSegment taskInfo : taskQueueByTopic.values()) { if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) { taskInfo.sendAllData(); } } }
轨迹数据存储的格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 TraceBean bean = ctx.getTraceBeans().get(0 );case Pub: {sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR) .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); } break ;
下图展示了事务轨迹消息数据,每个数据字段是按照 CONTENT_SPLITOR 分隔。
注意:
分隔符 CONTENT_SPLITOR = (char) 1 它在内存中的值是:00000001 , 但是 char i = ‘1’ 它在内存中的值是 49 ,即 00110001。