1. 分区的概念 Kafka 中 Topic 被分成多个 Partition 分区。Topic 是一个逻辑概念 ,Partition 是最小的存储单元 ,掌握着一个 Topic 的部分数据。每个 Partition 都是一个单独的 log 文件,每条记录都以追加的形式写入。
2. 分区的管理 2.1 优先副本选举 创建一个分区个数是3、副本因子是3的主题topic-partitions,创建后的分配信息如下:
1 2 3 4 5 6 7 8 9 # 1、创建主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --create --topic topic-partitions --partitions 3 --replication-factor 3 # 2、查看主题详情 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-partitions Topic:topic-partitions PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: topic-partitions Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
可以看到,leader副本均匀分布在brokerId为0、1、2的broker的节点之中。针对同一个分区而言, 同一个 broker节点中不可能出现它的多个副本, 即Kafka集群的一个 broker中最多只能有它的一个副本 ,我们可以将leader副本所在的broker节点叫作分区的leader节点 ,而 follower副本所在的broker节点叫作分区的follower节点 。
此时,我们将brokerId为2的节点重启 ,那么主题topic-partitions新的分布信息如下:
1 2 3 4 5 6 # 3、查看主题详情 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-partitions Topic:topic-partitions PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0 Topic: topic-partitions Partition: 1 Leader: 1 Replicas: 2,1,0 Isr: 1,0 Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,1
如此一来,原本负载均衡变成了失衡:节点1的负载最高,而节点0的负载最低。
“分区平衡”:通过一定的方式促使优选副本选举为leader副本,以此促进集群的负载均衡。
在kafka中可以提供分区自动平衡功能,榆次对应的broker端参数是auto.leader.rebalance.enbale,此参数的默认值是true,默认情况下此功能是开启的。
此外,kafka中的kafka-perferred-replica-election.sh脚本提供了对分区leader副本重新平衡的功能。
1 2 3 4 5 6 7 8 9 # 4、对分区leader副本进行重平衡 bin/kafka-preferred-replica-election.sh --zookeeper hadoop101:2181 # 5、查看主题详情 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-partitions Topic:topic-partitions PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: topic-partitions Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2 Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2
可以看到脚本执行后,主题topic-partitions中所有的leader副本的分布已经和刚创建时的一样了,所有的优先副本都会成为leader副本。
注意到:上述方式会将集群中所有的分区 都执行以便优先副本的选举工作,分区数越多打印的信息也就越多。leader副本的转移也是一项高成本的工作,如果需要执行的分区数很多,那么必然会对客户端造成一定的影响。
最后,kafka还提供了path-to-json-file参数来小批量地对部分分区执行优先副本的选举操作。具体来说就是指定一个path-to-json-file参数来指定一个JSON文件,这个JSON文件里面保存了需要执行这个优先副本选举的分区清单。
文件内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 # 创建election.json { "partitions" : [ { "partition" : O, "topic" : "topic-partitions" } , { "partition" : 1 , "topic" : "topic-partitions" } , { "partition" : 2 , "topic" : "topic-partitions" } ] }
执行脚本:
1 2 3 4 5 # 6、小批量对部分分区执行优先副本策略 bin/kafka-preferred-replica-election.sh --zookeeper hadoop101:2181 --path-to-json-file election.json # 7、查看主题详情 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-partitions
2.2 分区重分配 当集群中一个节点突然宕机下线时,如果节点上的分区是单副本的, 那么这些分区就变得不可用了, 在节点恢复前, 相应的数据也就处于丢失状态;如果节点上的分区是多副本的,那么位于这个节点上的leader副本的角色会转交到集群的其他 follower副本中。 总而言之, 这个节点上的分区副本都已经处于功能失效的状态,Kafka并不会将这些失效的分区副本自动地迁移到集群中剩余的可用broker节上,如果放任不管, 则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性。
a. 当要对集群中的一个节点进行有计划的下线操作时, 为了保证分区及副本的合理分配, 我们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。
b. 当集群中新增broker节点时, 只有新创建的主题分区才有可能被分配到这个节点上, 而之前的主题分区并不会自动分配到新加入的节点中, 因为在它们被创建时还没有这个新节点, 这样新节点的负载和原先节点的负载之间严重不均衡。
为了解决上述问题,需要让分区副本再此进行合理的分配,也就是所谓的分区重分配。kafka提供了kafka-reassign-partitions.sh脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移 。
kafka-reassign-partitions.sh脚本的使用:
创建一个包含主题清单的JSON文件
根据主题清单和broker节点清单生成一份重分配方案
根据上述方案执行具体的重分配动作
示例: 在一个由3个节点(broker 0、broker 1、broker2)组成的集群中创建一 个主题topic-reassign, 主题中包含4个分区和2个副本:
1 2 3 4 # 1、创建主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --create --topic topic-reassign --partitions 2 --replication-factor 4 # 2、查询主题详情 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-reassign
下线brokerId为1的broker节点,在此之前我们需要将其上的分区副本迁移出去。
使用kafka-reassign-partitions.sh脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 1、创建一个JSON文件,文件内容为要进行分区重分配的主题清单,比如: reassign.json { "topics":[ { "topic":"topic-reassign" } ], "version":1 } # 2、根据这个JSON文件和指定所要分配的broker节点列表来生成一份候选的重分配方案 bin/kafka-reassign-partitions.sh --zookeeper hadoop101:2181 --ganerate --topics-to-move-json-file reassign.json --broker-list 0,2 # 3、执行具体的重分配动作 bin/kafka-reassign-partitions.sh --zookeeper hadoop101:2181 --execute --reassignment-json-file reassign.json # 4、查询主题详情 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-reassign
可以看到主题中所有分区副本都只在0和2的broker节点上分布了。
基本原理:先通过控制器为每个分区添加新副本(增加副本因子),新的副本将从分区的leader副本那里复制所有的数据。根据分区的大小不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新的副本上的。在复制完成之后,控制前将旧副本清单里移除(恢复为原先的副本因子数)。注意需要在重分配过程中确保有足够的空间。
此时可以观察到topic-reassign中有三个leader副本在broker0上,而只有1个leader副本在broker2上,这样就不均衡了。可以借助kafka-perferred-replica-election.sh脚本执行一次优先副本的选举行动。
1 bin/kafka-preferred-replica-election.sh --zookeeper hadoop101:2181
此时,可以看到主题topic-reassign的具体信息已经趋于完美。
1 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-reassign
验证查看分区重分配的进度:
1 bin/kafka-reassign-partitions.sh --zookeeper hadoop101:2181 --verify --reassignment-json-file reassign.json
2.3 复制限流 重分配的本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终目的。数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能,尤其是处于业务高峰的时候。减小重分配的粒度,以小批次的方式操作是一种可行的解决思路。如果集群中某个主题或某个分区的流程在某个时间段内特别大,那么只靠减小粒度是不足应付的,这时候就需要有一个限流机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受到太大影响 。
副本间的复制限流有两种实现方式:kafka-config.sh脚本、kafka-reassign-partitions.sh脚本(略过);
2.4 修改副本因子 创建主题之后我们还可以修改分区的个数, 同样可以修改副本因子(副本数)。 修改副本因子的使用场景也很多, 比如在创建主题时填写了错误的副本因子数而需要修改, 再比如运行一段时间之后想要通过增加副本因子数来提高容错性和可靠性。
修改副本因子的功能也是通过重分配所使用的kafka-reassign-partition.sh 脚本实现的。project.json文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 { "version" : 1 , "partitions" : [ { "topic" : "topic-throttle" , "partition" : 1 , "replicas" : [ 2 , 0 ] , "log_dirs" : [ "any" , "any" ] } , { "topic" : "topic-throttle" , "partition" : 0 , "replicas" : [ 0 , 2 ] , "log_dirs" : [ "any" , "any" ] } , { "topic" : "topic-throttle" , "partition" : 2 , "replicas" : [ 0 , 2 ] , "log_dirs" : [ "any" , "any" ] } ] }
对于分区1而言,我们可以增加一个副本,比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "topic": "topic-throttle", "partition": 1, "replicas": [ 2, 1, 0 ], "log_dirs": [ "any", "any", "any" ] }
其他分区也类似操作,修改完成后保存为add.json文件
增加副本数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # 1、查询topic-throttle主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-throttle Topic: topic-throttle Parti tionCount: 3 ReplicationFactor: 2 Configs: Topic: topic-throttle Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: topic-throttle Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1 Topic: topic-throttle Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0 # 2、执行kafka-reassign-partition.sh脚本 bin/kafka-reassign-partition.sh --zookeeper hadoop101:2181 --describe --topic topic-throttle --execute --reassignment-json-file add.json Current partition replica assignment {"version":1,"partitions":[{"topic":"topic-throttle","partition":1,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-throttle","partition":0,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-throttle","partition":2,"replicas":[0,2],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file op七ion during rollback Successfully started reassignmen七 of partitions. # 3、再次查询topic-throttle主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-throttle Topic:topic-throttle PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-throttle Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: topic-throttle Partition: 1 Leader: 1 Replicas: 0,1,2 Isr: 2,1,0 Topic: topic-throttle Partition: 2 Leader: 2 Replicas: 0, 1,2 Isr: 2,0,1
再次修改reassign.json为如下:
1 {"version":1,"partitions":[{"topic":"topic-throttle","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"topic-throttle","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"topic-throttle","partition":2,"replicas":[0],"log_dirs":["any"]}]}
减少副本数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 1、查询topic-throttle主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-throttle # 2、执行kafka-reassign-partition.sh脚本 bin/kafka-reassign-partition.sh --zookeeper hadoop101:2181 --describe --topic topic-throttle --execute --reassignment-json-file reassign.json # 3、再次查询topic-throttle主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-throttle Topic : topic-throttle PartitonCount 3 ReplicationFactor : 1 Configs : Topic : topic-throttle Partition: 0 Leader : 2 Replicas : 2 Isr : 2 Topic : topic-throttle Partition: 1 Leader : 1 Replicas : 1 Isr: 1 Topic : topic-throttle Partition: 2 Leader : 0 Replicas : 0 Isr : 0
3. 如何选择合适的分区数 这个问题没有权威的答案,需要根据实际的业务场景、软件条件、硬件条件、负载情况等来做具体的考量。
3.1 性能测试工具 kafka本身提供的用于生产者性能测试的kafka-producer-perf-test.sh和用于消费者性能测试的kafka-consumer-perf-test.sh
向一个只有1个分区和1个副本的主题 topic-1 中发送 100 万条消息,并且每条消息大小为 1024B 生产者对应的 acks 参数为 l。
1 2 3 4 5 6 7 8 bin/kafka-producer-perf-test.sh --topic topic-1 --num-records 1000000 --record-size 1024 --throughput -1 --print-metrice --producer-props bootstrap.servers=localhost:9092 acks=1 topic:用来指定生产者发送消息的目标主题 num-records:用来指定发送消息的总条数 record-size:用来设置每条消息的字节数 producer-props:用来指定生产者配置,可以同时指定多组配置,各组配置之间以空格分割 throughput:用来进行限流控制,当设定的值小于0时不限流,当设定的值大于0时,当发送的吞吐量大于该值时就会被阻塞一段时间 print-metrice:测试完成之后打印指标信息
消费消息:
1 bin/kafka-consumer-perf-test.sh --topic topic-1 --messages 1000000 --broker-list localhost:9092
3.2 分区数越多吞吐量越高吗? 分区是Kafka 中最小的并行操作单元,对生产者而言, 每一 个分区的数据写入是完全可以并行化的;对消费者而言, Kafka 只允许单个分区中的消息被一 个消费者线程消费, 一 个消费组的消费并行度完全依赖于所消费的分区数。 如此看来, 如果一 个主题中的分区数越多, 理论上所能达到的吞吐量就越大, 那么事实真的如预想的一样吗?
当超过一个临界值后,分区数越多可能会导致吞吐量下降
3.3 分区数的上限 一味地增加分区数并不能使吞吐量 一直得到提升, 并且分区数也并不能一直增加, 如果超过默认的配置值, 还会引起 Kafka进程的崩溃。
1 2 # 创建一个包含10000个分区的主题 bin/kafka-topics.sh --zookeeper hadoop101:2181 --create --topic topic-bomb --replication-factor 1 --partitions 10000
3.4 考量因素 如何选择合适的分区数?一个“恰如其分”的答案就是视情况而定。