1. 消费者与消费者组

消费者负责订阅kafka中的主题,并且从订阅的主题上拉取消息。每个消费者都有一个对应的消费组,当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。消费者并非逻辑上的概念,它是实际的应用实例,可以是一个线程,可以是一个进程,同一个消费组内的消费者可以部署在同一台机器上,也可以部署在不同的机器上。
消费组是逻辑上的一个概念,它将旗下的消费者归为一类;每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,可以通过消费者客户端参数group.id来配置,默认是一个空的字符串。
举例说明:

  • 某个主题四个分区:p0、p1、p2、p3,有两个消费组A和B都订阅了这个主题,消费组A中有四个消费者(A1、A2、A3、A4),消费组B中有两个消费者(B1、B2)。

image.png

  • 某个主题七个分区:p0、p1、p2、p3、p4、p5、p6,消费组C订阅了该主题
    • 消费组内只有一个消费者C0

image.png

  • 消费组内有两个消费者C0、C1

image.png

  • 消费组内有三个消费者C0、C1、C2

image.png

  • 消费组内有八个消费者C0、C1、C2、C3、C4、C5、C6、C7

image.png

2. 客户端开发

在Kafka的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是于Kafka开源之初使用Scala语言编写的客户端,我们可以称之为旧消费者客户端(Old Consumer) 或 Scala消费者客户端;第二个是从Kafka 0.9.x版本开始推出的使用Java编写的客户端,我们可以称之为新消费者客户端(New Consumer)或Java消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷。
一个正常的消费逻辑需要具备以下几个步骤:

  1. 配置消费者客户端参数以及创建响应的消费者实例
  2. 订阅主题
  3. 拉取消息并消费
  4. 提交消费位移
  5. 关闭消费者实例

消费者客户端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class KafkaConsumerAnalysis {  
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo"; // 注意这里将 groupid 改为 groupId
public static final AtomicBoolean isRunning = new AtomicBoolean(true);

public static Properties initConfig() {
Properties props = new Properties();
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId); // 注意这里将 group!d 改为 groupId
props.put("client.id", "consumer.client.id.demo");
return props;
}

public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 注意这里将 七opic 改为 topic

try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = " + record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());

// do something to process record.
// ...
}
}
} catch (Exception e) {
// 注意:这里需要确保有 log 实例才能使用 log.error
// log.error("occur exception", e);
e.printStackTrace(); // 临时使用 e.printStackTrace() 代替 log.error
} finally {
consumer.close();
}
}
}

2.1 必要的参数配置

  • bootstrap.servers: 指定连接kafka集群所需要的broker地址清单;
  • group.id: 消费者隶属消费组的名称,默认值为:“”
  • key.serializer和value.serializer: 与生产者客户端的key.serializer和value.serializer参数对应,消费者从broker获取消息格式都是字节数组类型,所以需要执行响应的反序列化操作还原为对应的对象格式。

2.2 订阅主题与分区

一个消费者可以订阅一个或者多个主题,对于subscribe这个方法而言,既可以以集合的形式订阅多个主题,也可以正则表达式的形式订阅特定模式的主题,还可以指定分区订阅

  • 集合订阅方式
1
2
3
4
5
// 使用主题集合订阅并设置一个消费者再平衡监听器  
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) ;

// 使用主题集合订阅,不设置消费者再平衡监听器
public void subscribe(Collection<String> topics) ;
  • 正则表达式订阅方式
1
2
3
4
5
// 使用正则表达式模式订阅并设置一个消费者再平衡监听器  
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) ;

// 使用正则表达式模式订阅,不设置消费者再平衡监听器
public void subscribe(Pattern pattern) ;
  • 指定分区订阅方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void assign(Collection<TopicPartition> partitions);

// 比如:
consumer.assign(Arrays.asList(new TopicPartition("topic-demo", 0)));

// 如何知道主题的分区信息呢? 通过需要这个接口获取主题的分区信息
public List<Partitioninfo> partitionsFor(String topic)
其中 Partitionlnfo类型即为主题的分区元数据信息, 此类的主要结构如下:
public class Partitioninfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
//这里省略了构造函数、属性提取、toString等方法
}
  • 取消订阅
1
2
3
consumer.unsubscribe(); 
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());

2.3 反序列化

与序列化器成对出现;注意如无特殊需要,还是不建议使用自定义的序列化和反序列化器,那样会增加消费者和生产者的耦合度,在系统升级换代时候容易出错。在设计应用中,如果现有的序列化和反序列化器满足不了需要的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf等等通用的序列化工具来进行包装,以求实现尽可能更加通用的向后兼容。
StringSerializer的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class StringDeserializer implements Deserializer<String> {  
private String encoding = "UTF-8"; // 确保编码为正确的UTF-8

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("deserializer.encoding");
}
if (encodingValue != null && encodingValue instanceof String) {
encoding = (String) encodingValue;
}
}

@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null) {
return null;
} else {
return new String(data, encoding);
}
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}

自定义的序列化器CompanySerializer的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CompanyDeserializer implements Deserializer<Company> {  

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 这里可以添加配置代码,但当前为空
}

@Override
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 8) { // 假设前两个int分别表示name和address的长度,共8字节
throw new SerializationException("Size of data received by CompanyDeserializer is shorter than expected!");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name, address;
nameLen = buffer.getInt(); // 使用正确的getInt方法
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt(); // 修复闭合括号
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes); // 修复闭合括号

try {
// 使用StandardCharsets代替硬编码的"UTF-8"字符串
name = new String(nameBytes, StandardCharsets.UTF_8);
address = new String(addressBytes, StandardCharsets.UTF_8);
} catch (UnsupportedEncodingException e) { // 虽然这里不会抛出UnsupportedEncodingException,但保持格式一致
throw new SerializationException("Error occurred when deserializing", e);
}

// 注意:return语句应该在try-catch块之外
return new Company(name, address);
}

@Override
public void close() {
// 这里可以添加清理资源的代码,但当前为空
}

}

使用Protostuff实现自定义的序列化和反序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 序列化器
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}

Schema<Company> schema = (Schema<Company>) RuntimeSchema.getSchema(Company.class);
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

byte[] protostuff = null;
try {
protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}

return protostuff;
}
// 反序列化器
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}

Schema<Company> schema = (Schema<Company>) RuntimeSchema.getSchema(Company.class);
Company ans = new Company();

try {
ProtostuffIOUtil.mergeFrom(data, ans, schema);
} catch (Exception e) {
// 在这里处理异常,比如可以抛出一个自定义的序列化异常
throw new RuntimeException("Error occurred during deserialization", e);
}

return ans;
}

2.4 消息消费

Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
poll()方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// timeout 用来控制poll()方法的阻塞时间,在消费者的缓存区里没有可用数据时会发生阻塞
public ConsumerRecords<K, V> poll(final Duration timeout)

// ConsumerRecord主要属性
public class ConsumerRecord<K, V> {
// 主题的名称
private final String topic;
// 分区的编号
private final int partition;
// 消息所属分区的偏移量
private final long offset;
private final long timestamp;
// 消息创建的时间戳和消息追加到日志的时间戳
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
//省略若干方法
}

// ConsumerRecords的常用 方法
// 一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord
public Iterator<ConsumerRecord<K, V>> iterator()
// 获取消息集中指定分区的消息
public List<ConsumerRecord<K, V>> records(TopicPartition partition)
// 获取消息集中指定主题的消息
public Iterable<ConsumerRecord<K, V>> records(String topic)
// count() 计算出消息集中的消息个数
// isEmpty() 判断消息集是否为空
// empty() 获取一个空的消息集

2.5 位移提交

offset的概念:对于消费者消费到的位置(消费层面),将offset称为“位移”,对于消息在分区中的位置(存储层面),将offset称为“偏移量”。
在每次调用 poll () 方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息己经存储在 Kafka 了,并且暂不考虑异常情况的发生) 要做到这一点,就需要记录上次消费时的消费位移,并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移 再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作 对于同 分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的 而在新消费者客户端中,消费位移存储在 Kafka内部的主题 _consumer_offsets 这里把将消费位移存储起来(持久化)的动作称为“提交” ,消费者在消费完消息之后需要执行消费位移的提交。

lastConsumedOffset、committed offset、position的概念

  • lastConsumedOffset:当前消费到的位置
  • committed offset:已经提交过的消费位移
  • position:下一次需要拉取的消息位置

对于位移提交具体时机的把握不同,可能会造成重复消费和消息丢失现象。

  • 自动提交:

在kafka中默认的消费位移的提交方式是自动提交,这个是消费者客户端参数enable.auto.commit配置,默认值是true,但是注意这个默认的自动提交不是每消费一条消息就会提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值是5s。
在默认方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交动位移提交的动作是在poll ()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
缺点:重复消费、消息丢失

  • 手动提交

开启手动提交只要将enable.auto.commit配置为false,细分为同步提交和异步提交,对应于kafka中的commitSync()和commitAsync()两种类型的方法。
commitSync()只能提交当前批次对应的position值;
commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets) 提交指定偏移量
poll()方法会根据最新位移来进行提交,只要没有发生不可恢复的错误,就会阻塞消费线程直至位移提交完成。
commitAsycn()
commitAsync(OffsetCommitCallback callback)
commitAsync(final Map<TopicPartition,OffsetAndMetadata> offsets,OffsetCommitCallback callback)
提交失败如何处理?重试,重试会造成重复消费问题,为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的。如果位移提交失败经常发生,那么说明系统肯定出现了故障。一般情况下,位移提交失败很少发生,不重试也没关系,后面提交也会成功的。重试会增加代码逻辑复杂度,不重试可能会增加重复消费的概率。

2.6 控制或关闭消费

1
2
3
4
5
6
7
8
9
10
//
public void pause(Collect on<Top cPartition> part ons)
public roid resume(Collection<TopicPartition> partitions)

public Set<TopicPartition> paused ()

public void close()
public void close(Duration timeout)
@Deprecated
public void close(long timeout , TimeUnit timeUnit)

2.7 指定位移消费

auto.offset.reset 参数默认配置“latest”,“earliest”,”none”

1
public void seek(TopicPartition partition long offset)

seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。

2.8 再均衡

再均衡:分区的所属权从一个消费者转移到另 一消费者的行为, 它为消费组具备高可用 性和伸缩性提供保障, 使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
缺点:消费者无法读取消息、造成重复消费现象
再均衡监听器的作用:用来设定发生再均衡动作前后的 一些准备或收尾的动作。
使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();  

consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
// 这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交当前偏移量
consumer.commitSync(currentOffsets);
// 清空当前偏移量映射
currentOffsets.clear();
}
// 这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do nothing.
}
});

2.9 消费者拦截器

主要在消费到消息或在提交消费位移时进行一些定制化操作;

2.10 多线程实现

首先KafkaConsumer是非线程安全的;kafkaConsumer非线程安全并非意味着我们在消费的时候只能以单线程的方式执行。多线程的实现方式有以下几种:

  • 线程封闭;即为每一个线程实例化一个KafkaConsumer对象,该线程被称为消费线程,一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,如果消费线程的个数大于分区数,就会有部分消费线程一直处于空闲状态。这种多线程的实现方式和开启多个消费进程的实现方式没有本质区别,优点是可以按照顺序消费每个分区中的消息,缺点是每个消费线程都要维护一个独立的TCP连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class FirstKafkaConsumerThreadDemo {  

public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo"; // 注意这里我将groupid改为了groupId

public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // 注意这里我将BOOTSTRAP_SERVERS—CONFIG改为了BOOTSTRAP_SERVERS_CONFIG
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}

public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
for (int i = 0; i < consumerThreadNum; i++) { // 注意这里我将O改为了0
new KafkaConsumerThread(props, topic).start();
}
}

public static class KafkaConsumerThread extends Thread {

private KafkaConsumer<String, String> kafkaConsumer;

public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic)); // 注意这里我将七his改为了this
}

@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息模块
System.out.printf("Thread %d, offset = %d, key = %s, value = %s%n",
Thread.currentThread().getId(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 注意:在finally块中关闭KafkaConsumer是不合适的,因为这会阻止线程继续消费消息
// 通常,我们会在线程结束时(例如,在JVM关闭时)通过其他机制来关闭KafkaConsumer
// 如果你确定要在这里关闭它,请确保其他线程不会再次尝试使用它
// kafkaConsumer.close();
}
}
}
}
  • 多个消费线程同时消费同一个分区;通过assign、seek等方法实现,这样可以打破原有消费线程个数不能超过分区数的限制,进一步提高消费能力;但是位移提交、顺序控制比较复杂;实际中使用的比较少,一般不推荐使用。
  • 将消息处理模块改为多线程的实现方式;一般来说,poll拉取消息速度是相当快的,整体消费的瓶颈也正是在处理消息这一块,所以我们将消息处理模块改为多线程的实现方式。相比第一种实现方式,这种方式不仅提供横向扩展能力,还可以减少TCP连接对资源的消耗。考虑位移提交的两种解决方式:a. 引入一个共享变量offset;b. 基于滑动窗口实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// 0. 基于多线程的实现
public class ThirdMultiConsumerThreadDemo {

public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";

public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // 注意这里我将BOOTSTRAP_SERVERS—CONFIG改为了BOOTSTRAP_SERVERS_CONFIG
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}

public static void main(String[] args) {
Properties props = initConfig(); // 假设 initConfig() 方法已正确实现

// 创建一个消费者线程,传入配置、主题和线程数量
KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, topic, Runtime.getRuntime().availableProcessors());
consumerThread.start();
}

public static class KafkaConsumerThread extends Thread {

private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;

public KafkaConsumerThread(Properties props, String topic, int threadNumber) {
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;

// 创建线程池,注意调整线程池的参数以匹配您的需求
executorService = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

// 提交任务到线程池处理记录
for (ConsumerRecord<String, String> record : records) {
executorService.submit(new RecordsHandler(record));
}
}
} catch (Exception e) {
e.printStackTrace();
// 注意:通常不建议在捕获异常后立即关闭消费者,因为这可能会阻止其他线程继续消费
// 如果确定要关闭,请确保其他线程不会再次尝试使用它
// kafkaConsumer.close();
}
}

// 静态内部类,用于处理单个记录
public static class RecordsHandler implements Runnable {

public final ConsumerRecord<String, String> record;

public RecordsHandler(ConsumerRecord<String, String> record) {
this.record = record;
}

@Override
public void run() {
try {
// 在这里处理记录
System.out.println("Thread ID: " + Thread.currentThread().getId() + ", Offset: " + record.offset() + ", Key: " + record.key() + ", Value: " + record.value());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

// 1. 引入一个共享变量offset
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);

// 假设这里你要对tpRecords进行一些处理
// processRecords(tpRecords); // 这只是一个假设的函数,你需要根据自己的需求来实现

// 获取最后一个记录的偏移量
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();

// 同步块确保对offsets的线程安全访问
synchronized (offsets) {
if (!offsets.containsKey(tp)) {
// 如果offsets中还没有这个TopicPartition的键,则添加一个新的OffsetAndMetadata
// 假设你想把偏移量设为最后消费偏移量+l(l应该是一个提前定义的变量)
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + l));
} else {
// 如果已经存在,则检查当前偏移量是否小于或等于最后消费的偏移量+1
long position = offsets.get(tp).offset();
if (position < lastConsumedOffset + 1) {
// 如果小于,则更新偏移量
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + l));
}
// 注意:如果position大于或等于lastConsumedOffset + 1,则不需要做任何操作
}
}
}

// 2. 基于滑动窗口实现

2.11 重要的消费者参数

  • fetch.min.bytes

该参数用来配置 Consumer 在一次拉取请求(调用 poll () 方法)中能从 Kafka 中拉取的最小数据量,默认值为 1B

  • fetch.max.bytes

该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从 Kafka中拉取的最大数据 ,默认值为 52428800B,也就是 50MB 。

  • fetch.max.wait.ms

fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为 500ms 。如果 Kafka没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms

  • max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给 Consumer的最大数据 ,默认值为 1048576 (B),也就是1M

  • max.poll.records

这个参数用来配置 Consumer 次拉取请求中拉取的最大消息数,默认值为 500 (条)

  • connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是 540000 (ms ),即9分钟。

  • exclude.internal.topics

Kafka 中有两个内部的主题__consumer_offsets、__transaction_state 。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true 。如果设置 true ,那么只能使用 subscribe( Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为false 则没有这个限制。

  • receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区(SO_RECBU的大小,默认值为 65536B) , 64KB。如果设置为 -1,则使用操作系统的默认值。如果 Consumer和Kafka 处于不同的机房,则可以适当调大这个参数值。

  • send.buffer.bytes
  • request.timeout.ms
  • metadata.max.age.ms
  • reconnect.backoff.ms
  • retry.backoff.ms

这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为 50 ms 。这种机制适用于消费者向 broker 发送的所有请求。

  • isolation.level

这个参数用来配置消费者的事务隔离级别。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.