1. 订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

  1. 点对点(P2P)模式:

    在点对点模式中,消息发送者(⽣产者)将消息发送到⼀个特定的队列,⽽消息接收者(消费者)从该队列中接收消息。消息在队列中存储,⼀旦⼀个消息被消费者接收,它就从队列中移除,这确保了每个消息只被⼀个消费者处理。

​ 这种模式适⽤于⼀对⼀的通信,其中⼀个⽣产者向⼀个特定的消费者发送消息,确保消息的可靠传递和处理。

  1. 发布订阅(Pub/Sub)模式:

    在发布订阅模式中,消息发送者将消息发布到⼀个主题(topic),⽽消息订阅者则订阅感兴趣的主题。每个主题可以有多个订阅者,因此消息会被⼴播到所有订阅了相同主题的消费者。

​ 这种模式适⽤于⼀对多或多对多的通信,允许多个消费者同时接收和处理相同主题的消息。

发布订阅模式通常⽤于构建实时事件处理系统、⽇志处理、通知系统等,其中多个消费者需要订阅相同类型的消 息并进⾏处理。

点对点模式适⽤于⼀对⼀的通信,确保消息的可靠传递给⼀个特定的消费者,⽽发布订阅模式适⽤于⼀对多或多对多的通信,允许多个消费者同时接收相同主题的消息,⽤于构建实时事件系统和⼴播通信。

2. 消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

3. 消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

4. 消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

5. 至少一次 At least Once

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

6. 精确一次 Exactly Only Once

  1. 发送消息阶段,不允许发送重复的消息。

  2. 消费消息阶段,不允许消费重复的消息。

只有以上两个条件都满⾜情况下,才能认为消息是“Exactly Only Once”,⽽要实现以上两点,在分布式系统环境下,不

可避免要产⽣巨⼤的开销。所以 RocketMQ 为了追求⾼性能,并不保证此特性,要求在业务上进⾏去重, 也就是说消费

消息要做到幂等性。RocketMQ 虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消 费情况,只有⽹络

异常,Consumer 启停等异常情况下会出现消息重复。

此问题的本质原因是⽹络调⽤存在不确定性,即既不成功也不失败的第三种状态,所以才产⽣了消息重复性问题

7. Broker 的 Buffer 满了怎么办?

Broker 的 Buffer 通常指的是 Broker 中⼀个队列的内存 Buffer ⼤⼩,这类 Buffer 通常⼤⼩有限,如果 Buffer 满 了以

后怎么办?下⾯是 CORBA Notification 规范中处理⽅式:

(1). RejectNewEvents 拒绝新来的消息,向 Producer 返回 RejectNewEvents 错误码。

(2). 按照特定策略丢弃已有消息

  1. AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.

  2. FifoOrder - The first event received will be the first discarded.

  3. LifoOrder - The last event received will be the first discarded.

  4. PriorityOrder - Events should be discarded in priority order, such that lower priority events will be

discarded before higher priority events.

  1. DeadlineOrder - Events should be discarded in the order of shortest expiry deadline first.

RocketMQ 没有内存 Buffer 概念,RocketMQ 的队列都是持久化磁盘,数据定期清除。对于此问题的解决思路,RocketMQ 同其他 MQ 有⾮常显著的区别,RocketMQ 的内存 Buffer 抽象成⼀个⽆限⻓度的队列,不管有多少数据进来都能装得下,这个⽆限是有前提的,Broker 会定期删除过期的数据,例如 Broker 只保存 3 天的消息,那么这个 Buffer 虽然⻓度⽆限,但是 3 天前的数据会被从队尾删除。

8. 回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

9. 事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

10. 定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1 <= level <= maxLevel,消息延迟特定时间,例如 level == 1,延迟 1s
  • level > maxLevel,则 level == maxLevel,例如 level== 20,延迟 2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

11. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

12. 消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

13. 流量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。

消费者流控的结果是降低拉取频率。

14. 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

15. 消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要 求消息中间件具有⼀定的消息堆积能⼒,消息堆积分以下两种情况:

  1. 消息堆积在内存 Buffer,⼀旦超过内存 Buffer,可以根据⼀定的丢弃策略来丢弃消息,如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能⼒主要在于内存 Buffer ⼤⼩,⽽且消息堆积后,性能下降不会太⼤,因为内存中数据多少对于对外提供的访问能⼒影响有限。

  2. 消息堆积到持久化存储系统中,例如 DB ,KV 存储,⽂件记录形式。当消息不能在内存 Cache 命中时,要不可避免的访问磁盘,会产⽣⼤量读 IO,读 IO 的吞吐量直接决定了 消息堆积后的访问能⼒。

评估消息堆积能⼒主要有以下四点:

  1. 消息能堆积多少条,多少字节 ? 即消息的堆积容量。

  2. 消息堆积后,发消息的吞吐量⼤⼩,是否会受堆积影响 ?

  3. 消息堆积后,正常消费的 Consumer 是否会受影响 ?

  4. 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多⼤ ?

16. 分布式事务

已知的⼏个分布式事务规范,如 XA,JTA 等。其中 XA 规范被各⼤数据库⼚商⼴泛⽀持,如 Oracle,Mysql 等。 其中 XA 的 TM 实现佼佼者如 Oracle Tuxedo,在⾦融、电信等领域被⼴泛应⽤。

分布式事务涉及到两阶段提交问题,在数据存储⽅⾯的⽅⾯必然需要 KV 存储的⽀持,因为第⼆阶段的提交回滚需要修改消息状态,⼀定涉及到根据 Key 去查找 Message 的动作。RocketMQ 在第⼆阶段绕过了根据 Key 去查找 Message 的问题,采⽤第⼀阶段发送 Prepared 消息时,拿到了消息的 Offset,第⼆阶段通过 Offset 去访问消息, 并修改状态,Offset 就是数据的地址。

RocketMQ 这种实现事务⽅式,没有通过 KV 存储做,⽽是通过 Offset ⽅式,存在⼀个显著缺陷,即通过 Offset 更改数据,会令系统的脏⻚过多,需要特别关注。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.