1. 高可用

1.1 leader选举

Kafka 中的选举大致分为三大类: 控制器的选举、Leader 的选举、消费者的选举。在讲解 Leader 选举之前, 先说说 Kafka 控制器, 即 Broker。它除了具有一般 Broker 的功能外, 还具有选举分区Leader节点的功能, 在启动 Kafka 系统时候, 其中一个 Broker 会被选举为控制器, 负责管理主题分区和副本的状态, 还会执行重分配的任务。

控制器的启动顺序如下:

  1. 第一个启动的节点,会在 Zookeeper 系统里面创建一个临时节点 /controller ,并写入该节点的注册信息,使该节点成为控制器。

  2. 其他的节点在陆续启动时,也会尝试在 Zookeeper 系统中创建 /controller 节点,但是 /controller 节点已经存在,所以会抛出 “创建/controller节点失败异常” 的信息。创建失败的节点会根据返回的结果,判断出在 Kafka 集群中已经有一个控制器被成功创建了,所以放弃创建 /controller 节点,这样就确保了 Kafka 集群控制器的唯一性。

  3. 其他的节点,也会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理。

1.2 多副本

在 Kafka 中 Topic 被分为多个分区(Partition),分区是 Kafka 最基本的存储单位。在创建主题的时候可使用replication-factor参数指定分区的副本个数。分区副本总会有一个 Leader 副本,所有的消息都直接发送给Leader 副本,其它副本都需要通过复制 Leader 中的数据来保证数据一致。当 Leader 副本不可用时,其中一个 Follower 将会被选举并成为新的 Leader。

1.3 ISR机制

image-20240608115608774

如上图所示, 每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。Leader 副本必然是同步副本,也就是说, ISR 不只是追随者副本集合, 它比如包括 Leader 副本。甚至在某些情况下, ISR 只有Leader 这一个副本, 而对于 Follower 副本来说,它需要满足以下条件才能被认为是同步副本:

1) 必须定时向 Zookeeper 发送心跳;

2) 在规定的时间内从 Leader 副本 “低延迟” 地获取过消息。

如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。所以就可能会存在 Follower 不可能与 Leader 实时同步的风险。

Kafka 判断 Follower 是否与 Leader 同步的条件就是 Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义就是 Follower 副本能够落后 Leader 副本的最长时间间隔, 当前默认值为10秒, 也就是说, 只要一个Follower 副本落后 Leader 副本的时间不连续超过10秒, Kafka 就认为两者是同步的, 即使 Follower 副本中保持的消息要少于 Leader 副本中的消息。

Kafka中ISR的管理最终都会反馈到 Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护:

1) Controller来维护:Kafka 集群中的其中一个 Broker 会被选举为Controller,主要负责 Partition 管理和副本状态管理,也会执行重分配 Partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 Leader,ISR 和新的 leader_epoch 及controller_epoch 写入 Zookeeper 的相关节点中。同时发起 leaderAndIsrRequest 通知所有的 Replicas。

2) Leader来维护:Leader 有单独的线程定期检测 ISR 中 Follower 是否脱离 ISR , 如果发现 ISR 变化,则会将新的 ISR 信息返回到 Zookeeper 的相关节点中。

1.4 ACK机制

这个acks参数在kafka的使用中,是非常核心以及关键的一个参数,决定了很多东西, 这个acks跟副本机制,同步机制,ISR机制都密切相关, 如果无法理解这些,是无法充分理解acks参数的含义。

首先这个acks参数,是在KafkaProducer,也就是生产者客户端里设置的。那么也就是说,你往kafka写数据的时候,就可以来设置这个acks参数。这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。

  • acks = 0
    如果acks设置为0,那么 Producer 是不会等待 Broker 的反馈。该消息会被立刻添加到 Socket Buffer 中就认为已经发送完成。在这种情况下,服务器端是否收到请求是无法保证的,并且参数 Retries 也不会生效(因为客户端无法获得失败信息)。

    这个时候每个记录返回的 Offset 总是被设置为-1。这个模式下 Kafka 的吞吐量最大,并发最高,但是数据非常容易丢失,通常适用在一些记录应用日志,对数据要求不高的业务场景。

  • acks = 1
    如果acks设置为1,这个时候 Leader 节点会将记录先写入本地日志,并且在所有 Follower 节点反馈之前就先确认成功。在这种情况下,如果 Leader 节点在接收记录之后,并且在 Follower 节点复制数据完成之前发生错误,那么这条记录会丢失。这个模式和 Mysql 的主从异步复制一样,主从之间会有数据差异,此配置为 Kafka 默认配置。它平衡了数据安全和性能。

  • acks = all & min.insync.replicas >= 2

    如果acks设置为all,这个时候 Leader 节点会等待所有同步中的LSR副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。

    如果说 Leader 这时候刚接收到了消息,但是 Follower 没有收到消息,此时 Leader 宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。

    其中Broker有个配置项min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机。

    这种方式是牺牲了性能为代价,适合对数据要求比较高的业务场景。

2. 高性能

2.1 服务端高性能

2.1.1 Reactor多路复用

2.1.2 顺序写磁盘

2.1.3 OS Cache

2.1.4 日志设计

2.1.5 零拷贝计数

2.1.6 压缩传输

2.1.7 内存压缩mmap

2.2 生产端高性能

2.2.1 批处理设计

2.2.2 内存池设计

2.3 消费端高性能

2.3.1 无锁化offset管理

3. 高并发

3.1 超高并发的网络架构设计

这里我们将 Kafka 的网络架构抽象成如上图所示的三层架构, 整个请求流转的路径如下:

  1. 客户端发送请求过来, 在Kafka 服务端会有个Acceptor线程, 这个线程上面绑定了OP_ACCEPT事件, 用来监听发送过来的请求, 下面有个while死循环会源源不断的监听Selector是否有请求发送过来, 接收到请求链接后封装成socketchannel, 然后将socketChannel发送给网络第一层架构中。

  2. 在第一层架构中有3个一模一样的Processor线程, 这个线程的里面都有一个连接队列,里面存放socketchannel, 存放规则为轮询存放, 随着请求的不断增加, 连接队列里面就会有很多个socketchannel, 这个时候socketchannel就会在每个selector上面注册OP_READ事件, 参考上图第一层的第三个Processor线程, 即每个线程里面还有一个while循环会遍历每个socketchannel, 监听到事件后就会接收到客户端发送过来的请求, 这个时候Processor线程会对请求进行解析(发送过来的请求是二进制的, 上面已经说过, 跨网络传输需要进行序列化) , 并解析封装成Request对象发送到上图所示的网络第二层架构中

  3. 在第二层架构中会有两个队列, 一个RequestQueue(请求队列), 一个是ResponseQueue(返回队列), 在请求队列中会存放一个个Request请求, 起到缓冲的作用, 这个时候就到了网络第三层架构中。

  4. 在第三层架构中有个RequestHandler线程池, 里面默认有8个RequestHandler线程, 这8个线程启动后会不断的从第二层的RequestQueue队列中获取请求, 解析请求体里面的数据, 通过内置工具类将数据写入到磁盘

  5. 写入成功后还要响应客户端, 这个时候会封装一个Response对象, 会将返回结果存放到第二层的ResponseQueue队列中, 此时默认有3个小的Response队列, 这里面的个数是同第一层架构中的Processor线程一一对应的

  6. 这个时候第一层的Processor线程中while循环就会遍历Response请求, 遍历完成后就会在selector上注册OP_WRITE事件, 这个时候就会将响应请求发送回客户端。

  7. 在整个过程中涉及到2个参数:num.network.threads = 3 和 num.io.threads = 8 如果感觉默认参数性能不够好的话, 可以对这2个参数进行优化, 比如将num.network.threads = 9, num.io.threads = 32(和CPU个数要一致), 每个RequestHandler线程可以处理2000QPS, 2000 * 8 = 1.6万QPS , 扩容后可以支撑6.4万QPS, 通过扩容后Kafka可以支撑6万QPS, 可以看出通过上面的架构讲解, kafka是可以支撑高并发的请求的


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.