1. 概述

从编程的角度而言, 生产者就是负责向 Kafka发送消息的应用程序。 在 Kafka 的历史变迁中, 一共有两个大版本的生产者客户端:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,我们可以称之为旧生产者客户端(Old Producer) 或 Scala 版生产者客户端;第二个是从 Kafka 0.9.x 版本开始推出的使用Java 语言编写的客户端, 我们可以称之为新生产者客户端 (NewProducer) 或 Java 版生产者客户端, 它弥补了旧版客户端中存在的诸多设计缺陷。

虽然Kafka是用Java/Scala语言编写的,但这并不妨碍它对于多语言的支待,在Kafka官网中,”CLIENTS” 的入口提供了一份多语言的支待列表, 其中包括常用的C、C++、 Python 、 Go等语言, 不过这些其他类语言的客户端并非由Kafka社区维护,如果使用则需要另行下载。 本章主要针对现下流行的新生产者 (Java语言编写的)客户端做详细介绍, 而旧生产者客户端已被淘汰,故不再做相应的介绍了。

2. 原理分析

2.1 整体架构

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender 线程 (发送线程)。在主线程中由KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator ,也称为消息收集器)中。 Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka中。

RecordAccumulator 主要用来缓存消息以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为33554432B ,即 32M。 如果生产者发送消息的速度超过发送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer的send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即60秒。

主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列( Deque )中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch ,即 Deque <ProducerBatch>。消息写入缓存时,追加到双端队列的尾部: Sender读取消息时 ,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord, ProducerBatch 中可以包含一至多个ProducerRecord 。通俗地说, ProducerRecord 是生产者中创建的消息,而ProducerBatch 是指一个消息批次 ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量 。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息 。在 Kafka 产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在 RecordAccumulator 的内部还有一个 BufferPool,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用 。不过 BufferPool 只针对特定大小的ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小batch.size 参数来指定,默认值为 16384B ,即 16KB。我们可以适当地调大 batch.size参数以便多缓存一些消息。

ProducerBatch 大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord ) 流入RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch (如果没有则新建),查看 ProducerBatch 中是否还可以写入这个ProducerRecord ,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch 。在新建ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch ,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch 这段内存区域不会被复用。

Sender 从 RecordAccumulator 获取缓存的消息之后,会进一步将原本<分区, Deque<ProducerBatch>>的保存形式转变成< Node,List< ProducerBatch>的形式,其中 Node 表示 Kafka集群的 broker 节点 。对于网络连接来说,生产者客户端是与具体 broker 节点建立的连接,也就是具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言 ,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络 I/O 层面的转换。

在转换成<Node, List<ProducerBatch>>的形式之后, Sender 会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个 Node 了, 这里 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest 。

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中, InFlightRequests 保存对象的具体形式为 Map<Nodeld, Deque quest>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求( Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection ,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。通过比较 Deque size 与这个参数的大小来判断对应的 Node 中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

2.2 元数据的更新

InFlightRequests 可以获得 leastLoadedNode ,即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于下图中的 InFlightRequests 来说,图中展示了 三个节点Node0 、 Node1 、 Node2 ,很明显 Node1 负载最小。 也就是说, Node1 为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。 leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。

image-20241027090213459

我们使用如下的方式创建了一条消息ProducerRecord: ProducerRecord<String, String> record = new ProducerRecord<>(topic,”Hello, Kafka !”) ;

我们只知道主题的名称,对于其他一些必要的信息却一无所知 KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定〉目标分区,之后KafkaProducer 需要知道目标分区的 leade 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka ,在这一过程中需要的信息都属于元数据信息。

前面我们了解了 bootstrap.servers 参数只需要配置部分 broker 节点的地址即可,不需要配置所有 broker 节点的地址,因为客户端可以自己发现其他 broker 节点的地址,这一过程也属于元数据相关的更新操作 。与此同时 ,分区数量及 leader 副本的分布都会动态变化, 客户端需要动态地捕捉这些变化。

元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上, follower 副本分配在哪些节点上,哪些副本在 AR 、 ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作 。客户端参数metada.max.age.ms 的默认值为 300000 ,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode, 然后向这个Node 发送 MetadataRequest 请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的, 在创建完 MetadataRequest 之后同样会存入 InFlightRequests ,之后的步骤就和发送消息时的类似。元数据虽然由 Sender 线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过 synchronized 和 final 关键字来保障。

3. 客户端开发

  1. 配置生产者客户端参数以及创建响应的生产者示例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class KafkaProducerAnalysis {  
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";

public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}

public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Ineger partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp; //消息的时间戳
//省略其他成员方法和构造方法
}

3.1 必要的参数配置

  • bootstrap.servers: 该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单, 具体的内容格式为hostl:portl,host2:port2, 可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为 “” 。
  • key.serializer和value.serializer: broker端接收的消息必须以字节数组(byte[])的形式存在。

3.2 消息的发送

  • 创建ProducerRecord对象;public ProducerRecord(String topic,V value)
  • 发送消息有三种模式:发后即忘(fire-and-forget)、同步(sync)、异步(async)
  • send的重载方法;
1
2
public Future<RecordMeatdata> send(ProducerRecord<K,V> record);
public Future<RecordMeatdata> send(ProducerRecord<K,V> record,Callback callback);

3.3 序列化

生产者需要用序列化器(Serializer)把对象转换为字节数组才能通过网络发送给kafka。
StringSerializer的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class StringSerializer implements Serializer<String> {  

private String encoding = "UTF-8"; // 修正编码为UTF-8

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}
if (encodingValue != null && encodingValue instanceof String) {
encoding = (String) encodingValue;
}
}

@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null) {
return null;
} else {
// 由于UTF-8是Java标准支持的编码,这里不需要显式指定编码
// 除非你确实需要读取configs中的编码
return data.getBytes(StandardCharsets.UTF_8); // 使用StandardCharsets.UTF_8替代自定义编码
}
} catch (UnsupportedEncodingException e) {
// 因为使用了StandardCharsets.UTF_8,所以这里实际上不会抛出UnsupportedEncodingException
// 但为了代码完整性,保留这个catch块,并抛出更具体的异常
throw new org.apache.kafka.common.errors.SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}

@Override
public void close() {
// 无需执行任何操作,因为StringSerializer没有需要关闭的资源
}
}

自定义的序列化器CompanySerializer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class CompanySerializer implements Serializer<Company> {  

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 这里可以配置序列化器的一些属性,但在这个例子中我们不做任何配置
}

@Override
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}

byte[] name = null;
byte[] address = null;

try {
if (data.getName() != null) {
name = data.getName().getBytes(StandardCharsets.UTF_8);
} else {
name = new byte[0];
}

if (data.getAddress() != null) {
address = data.getAddress().getBytes(StandardCharsets.UTF_8);
} else {
address = new byte[0];
}

ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);

// 需要翻转buffer以使其为正确的字节顺序(如果需要的话)
buffer.flip();

return buffer.array();
} catch (UnsupportedEncodingException e) {
// 由于使用了StandardCharsets.UTF_8,所以这里实际上不会抛出UnsupportedEncodingException
// 但为了代码的完整性,我们仍然保留这个catch块
e.printStackTrace();
return new byte[0];
}
}

@Override
public void close() {
// 关闭方法在这个序列化器中不需要做任何事情
}
}
@Data
public class Company {
private String name;
private String address;
}

3.4 分区器

消息经过序列化之后就需要确定它发往的分区,如果消息ProducerReord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
自定义分区器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DemoPartitioner implements Partitioner {  

private final AtomicInteger counter = new AtomicInteger(0);

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 使用AtomicInteger的getAndIncrement方法确保线程安全地递增计数器
// 并用模运算符确保结果在分区数量范围内
return counter.getAndIncrement() % numPartitions;
} else {
// 使用Murmur2哈希算法计算key的哈希值,并用模运算符确保结果在分区数量范围内
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

@Override
public void close() {
// 如果有需要在关闭时执行的清理操作,可以在这里添加
}

@Override
public void configure(Map<String, ?> configs) {
// 如果有需要配置的参数,可以在这里读取
}
}

3.5 生产者拦截器

这里讲的是生产者拦截器;生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息内容等,也可以用来在发送回调逻辑前做一些定制化需求,比如统计类工作。
自定义生产者拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {  

private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "prefix-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers());
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure++;
}
}

@Override
public void close() {
double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率=" + String.format("%.2f%%", successRatio * 100));
}

@Override
public void configure(Map<String, ?> configs) {
// 这里可以配置拦截器的参数,如果不需要配置可以留空
}
}

4. 重要的生产者参数

4.1 acks

  • acks = 1。默认值为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。acks设置为1,是消息可靠性和吞吐量之间的折中方案。
  • acks = 0。生产者发送消息之后不需要等待任何服务端的响应。
  • acks = -1 或 acks = all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务器的成功响应。

4.2 max.request.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。笔者并不建议读者盲目地增大这个参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。因为这个参数还涉及一些其参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的异常。比如将 broker 端的 message.max.bytes 参数配置为 10 ,而 max.request.size参数配置为20, 那么当我发送一条大小为 15B 的消息时,生产者客户端就会报出如下的异常:
org.apache.kafka.common.errors.RecordTooLargeException: The request included a
message larger than the max message size the server will accept .

4.3 retries和retry.backoff.ms

retries参数用来配置生产者重试的次数,默认值是0,即在异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常。比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的。生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。因为不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。

4.4 compression.type

这个参数用来指定消息的压缩方式,默认值为“none ”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip“ 、”snappy“和“z4”。 对消息进行压缩可以极大地减少网络传输、降低网络IO ,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

4.5 connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

4.6 linger.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入ProducerBatch 时间,默认值为 0。生产者客户端会在 ProducerBatch 填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。 这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

4.7 receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区( SO_RECBUF )的大小,默认值为 32768 (B) ,32KB。如果设置为 -1 ,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。

4.8 send.buffer.bytes

这个参数用来设置 Socket 发送消息缓冲区 (SO_SNDBUF )的大 ,默认值 131072 (B) , 即128KB 。与 receive.buffer.bytes 参数一样 如果设置为 -1,则使用操作系统的默认值。

4.9 request.timeout.ms

这个参数用来配置 Producer 等待请求响应的 长时间,默认值为 30000 ( ms )。请求超之后可以选择进行重试。注意这个参数需要 broker 端参数 replica.lag.time.max.ms值要大,这样可以减少因客户端重试而引起的消息重复的概率。


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.