Kafka中采用了多副本的机制,这是大多数分布式系统中惯用的手法,以此来实现水平扩展、提供容灾能力、提升可用性和可靠性等。我们对此可以引申出 一系列的疑问:

  • Kafka多副本之间如何进行数据同步,尤其是在发生异常时候的处理机制又是什么?

  • 多副本间的数据一致性如何解决?

  • 一致性协议又是什么?

  • 如何确保Kafka的可靠性?

  • Kafka中的可靠性和可用性之间的关系又如何?

1. 副本剖析

副本 (Replica) 是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。在常见的分布式系统中,为了对外提供可用的服务,我们往往会对数据和服务进行副本处理。数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这是解决分布式系统数据丢失问题最有效的手段。另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。

组成分布式系统的所有计算机都有可能发生任何形式的故障。 一个被大量工程实践所检验过的 “黄金定理” :任何在设计阶段考虑到的异常情况,一定会在系统实际运行中发生,并且在系统实际运行过程中还会遇到很多在设计时未能考虑到的异常故障。所以,除非需求指标允许,否则在系统设计时不能放过任何异常情况。

Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka 通过多副本机制实现故障自动转移,在 Kafka集群中某个broker 节点失效的情况下仍然保证服务可用。

• 副本是相对于分区而言的,即副本是特定分区的副本。

• 一个分区中包含一个或多个副本,其中一个为leader副本,其余为 follower副本,各个副本位千不同的broker节点中。只有leader副本对外提供服务,follower副本只负责数据同步。

• 分区中的所有副本统称为AR, 而ISR是指与leader副本保持同步状态的副本集合,当然leader副本本身也是这个集合中的

一员。

• LEO标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO,ISR中最小的LEO即为HW, 俗称高水位,消费者只能拉取到HW之前的消息。

从生产者发出的一条消息首先会被写入分区的leader 副本,不过还需要等待ISR集合中的所有follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW, 进而消费者可以消费到这条消息。

1.1 失效副本

正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即under-replicated分区。

正常情况下,我们通过kafka-topics.sh脚本的under-replicated-partitions 参数来显示主题中包含失效副本的分区时结果会返回空。比如我们来查看下主题topic-partitions的相关信息:

将集群中的brokerid为2的节点关闭,再来执行命令,结果显示如下:

可以看到主题topic-partitions 中的三个分区都为under-replicated分区,因为它们都有副本处于下线状态,即处于功能失效状态。

失效副本不仅是指处于失效状态的副本,处于同步失效状态的副本也可以看作失效副本。怎么判定一个分区是否有副本处于同步失效的状态呢?Kafka从0.9x版本开始就通过唯一的broker端参数replica.lag.time.max.ms来抉择,当ISR集合中的一个follower副本滞后leader副本的时间超过此参数指定的值时则判断为同步失败,需要将此follower副本剔除出ISR集合。该参数的默认值是100000;

副本失效的情况:

  • follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC
  • follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大

如果通过工具增加了副本因子,那么新增加的副本在赶上leader副本之前也都是处于失效状态的。如果一个follower副本因子由于某些原因而下线,再次上线后,在追赶上leader之前也处于失效状态。

在 0.9.x 版本之前,Kafka中还有另一个参数 replica.lag.max.messages (默认值为4000) , 它也是用来判定失效副本的,当一个 follower 副本滞后 leader 副本的消息数超过replica.lag.max.messages 的大小时 ,则判定它处于同步失效的状态。它与replica.lag. time.max.ms参数判定出的失效副本取并集组成一个失效副本的集合, 从而进一步剥离出分区的ISR集合。

不过这个rep巨ca.lag.max.messages参数很难给定一个合适的值,若设置得太大,则这个参数本身就没有太多意义,若设置得太小则会让follower副本反复处于同步、未同步、同步的死循环中,进而又造成ISR集合的频繁伸缩。而且这个参数是broker级别的,也就是说,对broker中的所有主题都生效。以默认的值4000为例, 对于消息流入速度很低的主题(比如TPS为10)这个参数并无用武之地;而对于消息流入速度很高的主题(比如TPS为20000) , 这个参数的取值又会引入ISR的频繁变动。

具有失效副本的分区可以从侧面反映出Kafka集群的很多问题,毫不夸张地说:如果只用一个指标来衡量Kafka, 那么同步失效分区(具有失效副本的分区)的个数必然是首选。

1.2 ISR的伸缩

Kafka 在启动的时候会开启两个与ISR相关的定时任务,名称分别为”isr-expiration”和”isr-change-propagation”。isr-expiration任务会周期性地检测每个分区是否需要缩减其ISR集合。这个周期和replica.lag.time.max.ms参数有关,大小是这个参数值的一半,默认值为5000ms。当检测到ISR集合中有失效副本 时,就会收缩ISR集合 。

1.3 LEO和HW

对于副本而言,还有两个概念:本地副本、远程副本,本地副本是指对应的log分配在当前的broker节点上,远程副本是指对应的log分配在其他的broker节点上。在kafka中,同一个分区的信息会存在多个broker节点上,并被其上的副本管理器所管理,这样在逻辑层面每个broker节点上的分区就有了多个副本,但是只有本地副本才有对应的日志,某个分区有三个副本分别位于broker0、broker1、broker2节点中,其中带阴影的放款表示本地副本。假设broker0上的副本1为当前分区的leader副本,那么副本2和副本3就是follower副本,整个消息追加的过程可以概括如下:

  • 生产者客户端发送消息leader副本中。
  • 消息被追加到leader副本的本地日志中,并且会更新日志的偏移量。
  • follower副本向leader副本请求同步数据。
  • leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本的消息。
  • leader副本所在的服务器将拉取结果返回给follower副本。
  • follower副本收到leader副本并返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

1.4 Leader Epoch的介入

1.5 为什么不支持读写分离

在Kafka中 , 生产者写入消息、 消费者读取消息的操作都 是与leader副本进行交互的, 从而实现的是 一种主写主读的生产消费模型。数据库、Redis等都具备主写主读的功能, 与此同时还支持主写从读的功能,主写从读也就是读写分离,为了与主写主读对应,这里就以主写从读来称呼。Kafka 并不支待主写从读,这 是为什么呢?

从代码层面上来说,虽然增加了代码复杂度,但在 Kafka 中这种功能完全可以支待。对于这个问题,我们可以从 “ 收益点 ” 这个角度来做具体分析。主写从读可以让从节点去分担主节点的负载压力,预防主节点负载过重而从节点却空闲的情况发生。但是主写从读也有2个很明显的缺点:

  • 数据一致性问题
  • 延时问题;对延时敏感的应用 而言,主写从读的功能并不太适用。

现实情况下,很多应用既可以忍受一定程度上的延时,也可以忍受一段时间内的数据不一致的情况,那么对千这种情况,Kafka 是否有必要支持主写从读的功能呢?

主读从写可以均摊一定的负载却不能做到完全的负载均衡,比如对于数据写压力很大而读压力很小的情况,从节点只能分摊很少的负载压力,而绝大多数压力还是在主节点上。而在 Kafka中却可以达到很大程度上的负载均衡, 而且这种均衡是在主写主读的架构上实现的。我们来看一下 Kafka的生产消费模型。

image-20240615195126008

Kafka 集群中有3个分区,每个分区有3个副本,正好均匀地分布在3个 broker 上,灰色阴影的代表 leader副本,非灰色阴影的代表 follower 副本,虚线表示 follower副本从 leader副本上拉取消息。

当生产者写入消息的时候都写入 leader 副本,每个 broker 都有消息从生产者流入;

当消费者读取消息的时候也是从 leader 副本中读取的,每个 broker 都有消息流出到消费者。

我们很明显地可以看出,每个 broker 上的读写负载都是一样的,这就说明 Kafka 可以通过主写主读实现主写从读实现不了的负载均衡。上图展示的是一种理想的部署情况,有以下几种情况会造成一定程度上的负载不均衡:

  • broker 端的分区分配不均。当创建主题的时候可能会出现某些 broker分配到的分区数多而其他 broker分配到的分区数少,那么自然而然地分配到的 leader副本也就不均。
  • 生产者写入消息不均。生产者可能只对某些 broker 中的 leader副本进行大量的写入操作,而对其他 broker 中的 leader副本不闻不问。
  • 消费者消费消息不均。消费者可能只对某些 broker 中的 leader 副本进行大量的拉取操作,而对其他 broker 中的 leader副本不闻不问。
  • leader副本的切换不均。在实际应用中可能会由千 broker 宅机而造成主从副本的切换,或者分区副本的重分配等,这些动作都有可能造成各个 brokerleader 副本的分配不均。

针对第一种情况,在主题创建的时候尽可能使分区分配得均衡,好在 Kafka 中相应的分配算法也是在极力地追求这一 目标,如果是开发入员自定义的分配,则需要注意这方面的内容。对于第二和第三种情况,主写从读也无法解决。对于第四种情况,Kafka 提供了优先副本的选举来达到 leader 副本的均衡,与此同时,也可以配合相应的监控、告警和运维平台来实现均衡的优化。

在实际应用中,配合监控、告警、运维相结合的生态平台,在绝大多数情况下 Kafka 都能做到很大程度上的负载均衡。总的来说,Kafka 只支持主写主读有几个优点:可以简化代码的实现逻辑,减少出错的可能;将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的清况下,不会出现数据不一致的情况。为此,Kafka 又何必再去实现对它而言毫无收益的主写从读的功能呢?这一切都得益于 Kafka 优秀的架构设计,从某种意义上来说,主写从读是由于设计上的缺陷而形成的权宜之计。

2. 日志同步机制

在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。虽然有许多方式可以实现这些功能,但最简单高效的方式还是从集群中选出一个 leader 来负责处理数据写入的顺序性。只要 leader 还处于存活状态,那么 follower 只需按照 leader 中的写入顺序来进行同步即可。

通常情况下,只要 leader 不宕机我们就不需要关心 follower 的同步问题。不过当 leader 时机时,我们就要从 follower 中选举出一个新的 leaderfollower 的同步状态可能落后 leader 很多,甚至还可能处于宅机状态,所以必须确保选择具有最新日志消息的 follower作为新的 leader。日志同步机制的一个基本原则就是:如果告知客户端已经成功提交了某条消息,那么即使 leader宕机,也要保证新选举出来的 leader 中能够包含 这条消息。 这里就有一个需要权衡 **(tradeoff)**的地方,如果 leader 在消息被提交前需要等待更多的 follower 确认,那么在它宕机之后就可以有更多的 follower 替代它,不过这也会造成性能的下降。

对于这种 tradeoff, 一种常见的做法是 “少数服从多数” ,它可以用来负责提交决策和选举决策。虽然 Kafka 不采用 这种方式,但可以拿来探讨和理解 tradeoff 的艺术。在 这种方式下,如果我们有 2f+l 个副本,那么在提交之前必须保证有 f+l 个副本同步完消息。同时为了保证能正确选举出新的 leader, 至少要保证有 f+l 个副本节点完成日志同步并从同步完成的副本中选举出新的 leader 节点。并且在不超过 f 个副本节点失败的清况下,新的 leader 需要保证不会丢失已经提交过的全部消息。 这样在任意组合的 f+l 个副本中,理论上可以确保至少有一个副本能够包含已提交的全部消息, 这个副本的日志拥有最全的消息,因此会有资格被选举为新的 leader来对外提供服务。

“少数服从多数 ” 的方式有一个很大的优势,系统的延迟取决于最快的几个节点,比如副本数为 3, 那么延迟就取决于最快的那个 follower 而不是最慢的那个(除了 leader, 只需要另 一个 follower 确认即可)。不过它也有一些劣势,为了保证 leader 选举的正常进行,它 所能容忍的失败 follower 数比较少,如果要容忍 lfollower 失败,那么至少要有 3 个副本,如果要容忍 2follower 失败,必须要有 5 个副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。 这也就是 “少数服从多数” 的这种 Quorum模型常被用作共享集群配置(比如 ZooKeeper) , 而很少用于主流的数据存储中的原因。

与 “少数服从多数” 相关的一致性协议有很多,比如 ZabRaftViewstamped Replication 等。而 Kafka 使用的更像是微软的 PacificA 算法。

Kafka 中动态维护着一个 ISR 集合,处与 ISR 集合内的节点保待与 leader 相同的高水位CHW) , 只有位列其中的副本 (unclean.leader.election.enable 配置为 false) 才有资格被选为新的 leader。写入消息时只有等到所有 ISR 集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为 leader, 选举过程简单、开销低,这也是 Kafka 选用此模型的重要因素。Kafka 中包含大量的分区,leader副本的均衡保障了整体负载的均衡, 所以 这一因素也极大地影响 Kafka 的性能指标。

在采用 ISR模型和 (f+l) 个副本数的配置下,一个 Kafka 分区能够容忍最大 f 个节点失败,相比于 “少数服从多数” 的方式 所需的节点数大幅减少。实际上,为了能够容忍 f 个节点失败,“少数服从多数” 的方式和ISR的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍1个节点失败,“少数服从多数” 需要3个副本和1个 follower的确认信息,采用ISR的方式需要2个副本和1个 fo llower的确认信息。在需要相同确认信息数的情况下,采用ISR的方式所需要的副本总数变少,复制带来的集群开销也就更低, “少数服从多数” 的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对 Kafka而言,这种能力可以交由客户端自己去选择。

另外,一般的同步策略依赖于稳定的存储系统来做数据恢复,也就是说,在数据恢复时日志文件不可丢失且不能有数据上的冲突。不过 它们忽视了两个问题:首先,磁盘故障是会经常发生的,在持久化数据的过程中并不能完全保证数据的完整性;其次,即使不存在硬件级别的故障,我们也不希望在每次写入数据时执行同步刷盘(fsync)的动作来保证数据的完整性,这样会极大地影响性能。而Kafka不需要宅机节点必须从本地数据日志中进行恢复,Kafka的同步方式允许宅机副本重新加入ISR集合,但在进入ISR之前必须保证自己能够重新同步完leader中的所有数据。

3. 可靠性分析

很多人问过笔者类似这样的一些问题:怎样可以确保 Kafka完全可靠?如果这样做就可以确保消息不丢失了吗?笔者认为:就可靠性本身而言,它并不是一个可以用简单的 ”是 ” 或 “ 否 ”来衡量的一个指标,而一般是采用几个9来衡量的。任何东西不可能做到完全的可靠,即使能应付单机故障,也难以应付集群、数据中心等集体故障,即使躲得过天灾也未必躲得过人祸。就可靠性而言,我们可以基于一定的假设前提来做分析。本节要讲述的是:在只考虑Kafka本身使用方式的前提下如何最大程度地提高可靠性。

就 Kafka而言,越多的副本数越能够保证数据的可靠性,副本数可以在创建主题时配置,也可以在后期修改,不过副本数越多也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。一般而言,设置副本数为3即可满足绝大多数场景 对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用 Kafka时就会设置副本数为5。与此同时,如果能够在分配分区副本的时候引入基架信息(broker.rack参数),那么还要应对机架整体宅机的风险。

仅依靠副本数来支撑可靠性是远远不够的,大多数入还会想到生产者客户端参数acks。我们就介绍过这个参数:相比于0和1, acks = -1 (客户端还可以配置为all, 它的含义与-1一样,以下只以-1来进行陈述)可以最大程度地提高消息的可靠性。

对于acks= 1的配置,生产者将消息发送到leader副本, ader副本在成功写入本地日志之后会告知生产者已经成功提交。如果此时ISR集合的 follower副本还没来得及拉取到leader中新写入的消息,leader就宕机了,那么此次发送的消息就会丢失。

image-20240615192411084

对于 ack = -1的配置,生产者将消息发送到leader 副本,leader 副本在成功写入本地日志之后还要等待 ISR 中的 follower 副本全部同步完成才能够告知生产者已经成功提交, 即使此时leader 副本宅机, 消息也不会丢失。

image-20240615192953940

同样对于 acks = -1 的配置,如果在消息成功写入 leader 副本之后,并且在被ISR 中的所有副本同步之前leader 副本宅机了,那么生产者会收到异常以此告知此次发送失败。

image-20240615193353338

消息发送的3种模式,即发后即忘、同步和异步。对于发后即忘的模式,不管消息有没有被成功写入,生产者都不会收到通知,那么即使消息写入失败也无从得知,因此发后即忘的模式不适合高可靠性要求的场景。如果要提升可靠性,那么生产者可以采用同步或异步的模式,在出现异常情况时可以及时获得通知,以便可以做相应的补救措施,比如选择重试发送(可能会引起消息重复)。

有些发送异常属于可重试异常,比如NetworkException, 这个可能是由瞬时的网络故障而导致的,一般通过重试就 可以解决。对千这类异常,如果直接抛给客户端的使用方也未免过于兴师动众,客户端内部本身提供了重试机制来应对这种类型的异常,通过 retries参数即可配置。默认情况下,retries参数设置为O, 即不进行重试,对于高可靠性要求的场景,需要将这个值设置为大于0 的值,与retries 参数相关的还有一个retry.backoff.ms参数,它用来设定两次重试之间的时间间隔,以此避免无效的频繁重试。在配置retries 和retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大千这个异常恢复时间,以此来避免生产者过早地放弃重试。如果不知道retries参数应该配置为多少,则可以参考KafkaAdminClient, 在KafkaAdminClient中retries参数的默认值为5。

注意如果配置的retries参数值大于 O, 则可能引起一些负面的影响。由于默认的max.in.f巨ght.reques 七s.per.connection参数值为5, 这样可能会影 响 消息的顺序性,对此 要么放 弃 客 户端内部的重试功能 , 要么将max.in.flight.requests.per.connection参数设置为1, 这样也就放弃了吞吐。其次,有些应用对于时延的要求很高,很 多时候都是需要快速失败的,设置retries>0会增加客户端对于异常的反馈时延,如此可能会对应用造成不良的影响。

再来看一下acks = -1的情形,它要求ISR中所有的副本都收到相关的消息之后才能够告知生产者已经成功提交。试想一下这样的情形,leader 副本的消息流入速度很快,而follower副本的同步速度很慢,在某个临界点时所有的 fllower副本都被剔除出了ISR集合,那么ISR中只有一个 leader副本,最终acks= -1演变为acks= 1的情形,如此也就加大 了消息丢失的风险。Kafka也考虑到 了这种情况,并为此提供了min.insync.rep巨cas参数(默认值为1)来作为辅助(配合 acks= -1来使用),这个参数指定 了ISR集合中最小的副本数,如果不满足条件就会抛出NotEnoughReplicasException或NotEnoughReplicasAfterApendException。在正常的配置下,需要满足副本数 >min. in sync. replicas参数的值。一个典型的配置方案为:副本数配置为3, min.insync.rep巨cas 参数值配置为2。注意min.insync.replicas参数在提升可靠性的时候会从侧面影响可用性。试想如果ISR中只有一个 leader副本,那么最起码还可以使用,而 此时如果配置 min. insync. replicas>l, 则会使消息无法写入。

与可靠性和ISR集合有关的还有一个参数——unclean.leader.election.enable。这个参数的默认值为false, 如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的leader, 这样有可能造成数据的丢失。如果这个参数设置为false, 那么也会影响可用性,非ISR集合中的副本虽然没能及时同步所有的消息,但最起码还是存活的可用副本。随着Kafka版本的变更,有的参数被淘汰,也有 新的参数加入进来,而传承下来的参数一般都很少会修改既定的默认值,而 unclean.leader.election.enable 就是这样一 个反例,从0.11.0.0版本开始,unclean.leader.election.enable 的默认值由原来的true改为了false, 可以看出Kafka的设计者愈发地偏向于可靠性的提升。

在broker端还有两个参数 log.flush.interval.messages 和log.flush.interval.ms , 用来调整同步刷盘的策略,默认是不做控制而交由操作系统本身来进行处理。同步刷盘是增强一个组件可靠性的有效方式,Kafka 也不例外,但笔者对同步刷盘有一定的疑问——绝大多数情景下,一个组件 (尤其是大数据量的组件)的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而应该采用多副本的机制来保障。

对于消息的可靠性,很多人都会忽视消费端的重要性,如果一条消息成功地写入 Kafka,并且也被 Kafka完好地保存,而在消费时由于某些疏忽造成没有消费到这条消息,那么对于应用来说,这条消息也是丢失的。

enable.auto.commit的参数的默认值为true, 即开启自动位移提交的功能,虽然这种方式非常简便,但它会带来重复消费和消息丢失的问题,对千高可靠性要求的应用来说显然不可取,所以需要将enable.auto.commit参数设置为false来执行手动位移提交。在执行手动位移提交的时候也要遵循一个原则 :如果消息没有被成功消费,那么就不能提交所对应的消费位移。对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。有时候,由于应用解析消息的异常,可能导致部分消息一直不能够成功被消费,那么这个时候为了不影响整体消费的进度,可以将这类消息暂存到死信队列中,以便后续的故障排除。

对于消费端, Kafka 还提供了 一个可以兜底的功能,即回溯消费,通过这个功能可以让我们能够有机会对漏掉的消息相应地进行回补,进而可以进一步提高可靠性。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.