1. 基础概念

消息轨迹是指⼀条消息从⽣产者发送到 Broker , 再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚⽽成的完整链路信息。

image-20240505165945261

当我们需要查询消息轨迹时,需要明⽩⼀点:消息轨迹数据是存储在 Broker 服务端,我们需要定义⼀个主题,在⽣产者,消费者端定义轨迹钩⼦。

2. 开启轨迹

2.1 Broker配置文件

1
2
# 开启消息轨迹
traceTopicEnable=true

2.2 生产者配置

1
2
3
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String
customizedTraceTopic)

在⽣产者的构造函数⾥,有两个核⼼参数:

  • enableMsgTrace:是否开启消息轨迹

  • customizedTraceTopic:记录消息轨迹的 Topic , 默认是: RMQ_SYS_TRACE_TOPIC 。

执⾏如下的⽣产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Producer {
public static final String PRODUCER_GROUP = "mytestGroup";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "example";
public static final String TAG = "TagA";
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
try {
String key = UUID.randomUUID().toString();
System.out.println(key);
Message msg = new Message(
TOPIC,
TAG,
key,
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
// 这⾥休眠⼗秒,是为了异步发送轨迹消息成功。
Thread.sleep(10000);
producer.shutdown();
}
}

在⽣产者代码中,我们指定了消息的 key 属性, 便于对于消息进⾏⾼性能检索。

执⾏成功之后,我们从控制台查看轨迹信息。

image-20240505170455488

image-20240505170523703

从图中可以看到,消息轨迹中存储了消息的 存储时间 、 存储服务器IP 、 发送耗时 。

2.3 消费者配置

和⽣产者类似,消费者的构造函数可以传递轨迹参数:

1
2
3
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace);
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final
String customizedTraceTopic);

执⾏如下的消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Consumer {
public static final String CONSUMER_GROUP = "exampleGruop";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "example";

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP , true);
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

image-20240505170831705

3. 实现原理

轨迹的实现原理主要是在⽣产者发送、消费者消费时添加相关的钩⼦。 因此,我们只需要了解钩⼦的实现逻辑即可。

下⾯的代码是 DefaultMQProducer 的构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
// if client open the message trace feature
if (enableMsgTrace) {
try {
//异步轨迹分发器
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
// 发送消息时添加执⾏钩⼦
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
// 结束事务时添加执⾏钩⼦
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

当是否开启轨迹开关打开时,创建异步轨迹分发器 AsyncTraceDispatcher ,然后给默认的⽣产者实现类在发送消息的钩⼦ SendMessageTraceHookImpl 。

1
2
3
//发送消息时添加执⾏钩⼦
this.defaultMQProducerImpl.registerSendMessageHook(new
SendMessageTraceHookImpl(traceDispatcher));

我们把⽣产者发送消息的流程简化如下代码 :

1
2
3
4
5
6
//DefaultMQProducerImpl#sendKernelImpl
this.executeSendMessageHookBefore(context);
// 发⽣消息
this.mQClientFactory.getMQClientAPIImpl().sendMessage(....)
// ⽣产者发送消息后会执⾏
this.executeSendMessageHookAfter(context);

进⼊ SendMessageTraceHookImpl 类 ,该类主要有两个⽅法 sendMessageBefore 和 sendMessageAfter 。

1、sendMessageBefore ⽅法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}

2、sendMessageAfter ⽅法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}

if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}

TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
localDispatcher.append(tuxeContext);
}

跟踪对象⾥会保存 costTime (消息发送时间)、 success (是否发送成功)、 regionId (发送到 Broker 所在的分区)、 msgId (消息 ID,全局唯⼀)、 offsetMsgId (消息物理偏移量) , storeTime (存储时间 ) 。

存储时间并没有取消息的实际存储时间,⽽是估算出来的:客户端发送时间的⼀般的耗时表示消息的存储时间。

最后将跟踪上下⽂添加到本地轨迹分发器:

1
localDispatcher.append(tuxeContext);

下⾯我们分析下轨迹分发器的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.group = group;
this.type = type;

this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}

上⾯的代码展示了分发器的构造函数和启动⽅法,构造函数创建了⼀个发送消息的线程池 traceExecutor ,启动 start后会启动⼀个 worker线程 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class AsyncRunnable implements Runnable {
private boolean stopped;

@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
synchronized (traceContextQueue) {
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue - traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}

}
}

worker 启动后,会从轨迹上下⽂队列 traceContextQueue 中不断的取出轨迹上下⽂,并将上下⽂转换成轨迹数据⽚段 TraceDataSegment 。

为了提升系统的性能,并不是每⼀次从队列中获取到数据就直接发送到 MQ ,⽽是积累到⼀定程度的临界点才触发这个操作,我们可以简单的理解为批量操作

这⾥⾯有两个维度 :

  1. 轨迹数据⽚段的数据⼤⼩⼤于某个数据⼤⼩阈值。笔者认为这段 RocketMQ 4.9.4 版本代码存疑,因为最新的 5.0版本做了优化。
1
2
3
4
5
6
if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName,regionId,dataToSend);
traceExecutor.submit(asyncDataSendTask);
this.clear();
}
  1. 当前时间 - 轨迹数据⽚段的⾸次存储时间 是否⼤于刷新时间 ,也就是每500毫秒刷新⼀次。
1
2
3
4
5
6
7
8
private void sendDataByTimeThreshold() {
long now = System.currentTimeMillis();
for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
taskInfo.sendAllData();
}
}
}

轨迹数据存储的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

TraceBean bean = ctx.getTraceBeans().get(0);
//append the content of context and traceBean to transferBean's TransData
case Pub: {
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}
break;

下图展示了事务轨迹消息数据,每个数据字段是按照 CONTENT_SPLITOR 分隔。

image-20240505171727732

注意:

分隔符 CONTENT_SPLITOR = (char) 1 它在内存中的值是:00000001 , 但是 char i = ‘1’ 它在内存中的值是 49 ,即 00110001。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.