1. 消费者与消费者组
消费者负责订阅kafka中的主题,并且从订阅的主题上拉取消息。每个消费者都有一个对应的消费组,当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。消费者并非逻辑上的概念,它是实际的应用实例,可以是一个线程,可以是一个进程,同一个消费组内的消费者可以部署在同一台机器上,也可以部署在不同的机器上。
消费组是逻辑上的一个概念,它将旗下的消费者归为一类;每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,可以通过消费者客户端参数group.id来配置,默认是一个空的字符串。
举例说明:
- 某个主题四个分区:p0、p1、p2、p3,有两个消费组A和B都订阅了这个主题,消费组A中有四个消费者(A1、A2、A3、A4),消费组B中有两个消费者(B1、B2)。
- 某个主题七个分区:p0、p1、p2、p3、p4、p5、p6,消费组C订阅了该主题
- 消费组内有八个消费者C0、C1、C2、C3、C4、C5、C6、C7
2. 客户端开发
在Kafka的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是于Kafka开源之初使用Scala语言编写的客户端,我们可以称之为旧消费者客户端(Old Consumer) 或 Scala消费者客户端;第二个是从Kafka 0.9.x版本开始推出的使用Java编写的客户端,我们可以称之为新消费者客户端(New Consumer)或Java消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷。
一个正常的消费逻辑需要具备以下几个步骤:
- 配置消费者客户端参数以及创建响应的消费者实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
消费者客户端示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class KafkaConsumerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static final AtomicBoolean isRunning = new AtomicBoolean(true); public static Properties initConfig() { Properties props = new Properties(); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("bootstrap.servers", brokerList); props.put("group.id", groupId); props.put("client.id", "consumer.client.id.demo"); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset()); System.out.println("key = " + record.key() + ", value = " + record.value()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } }
|
2.1 必要的参数配置
- bootstrap.servers: 指定连接kafka集群所需要的broker地址清单;
- group.id: 消费者隶属消费组的名称,默认值为:“”
- key.serializer和value.serializer: 与生产者客户端的key.serializer和value.serializer参数对应,消费者从broker获取消息格式都是字节数组类型,所以需要执行响应的反序列化操作还原为对应的对象格式。
2.2 订阅主题与分区
一个消费者可以订阅一个或者多个主题,对于subscribe这个方法而言,既可以以集合的形式订阅多个主题,也可以正则表达式的形式订阅特定模式的主题,还可以指定分区订阅
1 2 3 4 5
| public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) ;
public void subscribe(Collection<String> topics) ;
|
1 2 3 4 5
| public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) ;
public void subscribe(Pattern pattern) ;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void assign(Collection<TopicPartition> partitions);
consumer.assign(Arrays.asList(new TopicPartition("topic-demo", 0)));
public List<Partitioninfo> partitionsFor(String topic) 其中 Partitionlnfo类型即为主题的分区元数据信息, 此类的主要结构如下: public class Partitioninfo { private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; private final Node[] offlineReplicas; //这里省略了构造函数、属性提取、toString等方法 }
|
1 2 3
| consumer.unsubscribe(); consumer.subscribe(new ArrayList<String>()); consumer.assign(new ArrayList<TopicPartition>());
|
2.3 反序列化
与序列化器成对出现;注意如无特殊需要,还是不建议使用自定义的序列化和反序列化器,那样会增加消费者和生产者的耦合度,在系统升级换代时候容易出错。在设计应用中,如果现有的序列化和反序列化器满足不了需要的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf等等通用的序列化工具来进行包装,以求实现尽可能更加通用的向后兼容。
StringSerializer的代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class StringDeserializer implements Deserializer<String> { private String encoding = "UTF-8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) { encodingValue = configs.get("deserializer.encoding"); } if (encodingValue != null && encodingValue instanceof String) { encoding = (String) encodingValue; } } @Override public String deserialize(String topic, byte[] data) { try { if (data == null) { return null; } else { return new String(data, encoding); } } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } @Override public void close() { } }
|
自定义的序列化器CompanySerializer的代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class CompanyDeserializer implements Deserializer<Company> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public Company deserialize(String topic, byte[] data) { if (data == null) { return null; } if (data.length < 8) { throw new SerializationException("Size of data received by CompanyDeserializer is shorter than expected!"); } ByteBuffer buffer = ByteBuffer.wrap(data); int nameLen, addressLen; String name, address; nameLen = buffer.getInt(); byte[] nameBytes = new byte[nameLen]; buffer.get(nameBytes); addressLen = buffer.getInt(); byte[] addressBytes = new byte[addressLen]; buffer.get(addressBytes); try { name = new String(nameBytes, StandardCharsets.UTF_8); address = new String(addressBytes, StandardCharsets.UTF_8); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error occurred when deserializing", e); } return new Company(name, address); } @Override public void close() { }
}
|
使用Protostuff实现自定义的序列化和反序列化器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public byte[] serialize(String topic, Company data) { if (data == null) { return null; } Schema<Company> schema = (Schema<Company>) RuntimeSchema.getSchema(Company.class); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); byte[] protostuff = null; try { protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } return protostuff; }
public Company deserialize(String topic, byte[] data) { if (data == null) { return null; } Schema<Company> schema = (Schema<Company>) RuntimeSchema.getSchema(Company.class); Company ans = new Company(); try { ProtostuffIOUtil.mergeFrom(data, ans, schema); } catch (Exception e) { throw new RuntimeException("Error occurred during deserialization", e); } return ans; }
|
2.4 消息消费
Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
poll()方法的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public ConsumerRecords<K, V> poll(final Duration timeout)
public class ConsumerRecord<K, V> { private final String topic; private final int partition; private final long offset; private final long timestamp; private final TimestampType timestampType; private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; private final K key; private final V value; private volatile Long checksum; //省略若干方法 }
public Iterator<ConsumerRecord<K, V>> iterator()
public List<ConsumerRecord<K, V>> records(TopicPartition partition)
public Iterable<ConsumerRecord<K, V>> records(String topic)
|
2.5 位移提交
offset的概念:对于消费者消费到的位置(消费层面),将offset称为“位移”,对于消息在分区中的位置(存储层面),将offset称为“偏移量”。
在每次调用 poll () 方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息己经存储在 Kafka 了,并且暂不考虑异常情况的发生) 要做到这一点,就需要记录上次消费时的消费位移,并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移;再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作 对于同 分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的 而在新消费者客户端中,消费位移存储在 Kafka内部的主题 _consumer_offsets 这里把将消费位移存储起来(持久化)的动作称为“提交” ,消费者在消费完消息之后需要执行消费位移的提交。
lastConsumedOffset、committed offset、position的概念
- lastConsumedOffset:当前消费到的位置
- committed offset:已经提交过的消费位移
- position:下一次需要拉取的消息位置
对于位移提交具体时机的把握不同,可能会造成重复消费和消息丢失现象。
在kafka中默认的消费位移的提交方式是自动提交,这个是消费者客户端参数enable.auto.commit配置,默认值是true,但是注意这个默认的自动提交不是每消费一条消息就会提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值是5s。
在默认方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交动位移提交的动作是在poll ()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
缺点:重复消费、消息丢失
开启手动提交只要将enable.auto.commit配置为false,细分为同步提交和异步提交,对应于kafka中的commitSync()
和commitAsync()
两种类型的方法。
1 2 3 4 5 6
| commitSync()只能提交当前批次对应的position值; commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets) 提交指定偏移量 poll()方法会根据最新位移来进行提交,只要没有发生不可恢复的错误,就会阻塞消费线程直至位移提交完成。 commitAsync() commitAsync(OffsetCommitCallback callback) commitAsync(final Map<TopicPartition,OffsetAndMetadata> offsets,OffsetCommitCallback callback)
|
提交失败如何处理?重试,重试会造成重复消费问题,为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的。如果位移提交失败经常发生,那么说明系统肯定出现了故障。一般情况下,位移提交失败很少发生,不重试也没关系,后面提交也会成功的。重试会增加代码逻辑复杂度,不重试可能会增加重复消费的概率。
2.6 控制或关闭消费
1 2 3 4 5 6 7 8 9 10
| public void pause(Collect on<Top cPartition> part ons) public roid resume(Collection<TopicPartition> partitions)
public Set<TopicPartition> paused ()
public void close() public void close(Duration timeout) @Deprecated public void close(long timeout , TimeUnit timeUnit)
|
2.7 指定位移消费
auto.offset.reset 参数默认配置“latest”,“earliest”,”none”
1
| public void seek(TopicPartition partition long offset)
|
seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。
2.8 再均衡
再均衡:分区的所属权从一个消费者转移到另 一消费者的行为, 它为消费组具备高可用性和伸缩性提供保障, 使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
缺点:消费者无法读取消息、造成重复消费现象
再均衡监听器的作用:用来设定发生再均衡动作前后的 一些准备或收尾的动作。
使用例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { consumer.commitSync(currentOffsets); currentOffsets.clear(); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } });
|
2.9 消费者拦截器
主要在消费到消息或在提交消费位移时进行一些定制化操作;
2.10 多线程实现
首先KafkaConsumer是非线程安全的;kafkaConsumer非线程安全并非意味着我们在消费的时候只能以单线程的方式执行。多线程的实现方式有以下几种:
- 线程封闭;即为每一个线程实例化一个KafkaConsumer对象,该线程被称为消费线程,一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,如果消费线程的个数大于分区数,就会有部分消费线程一直处于空闲状态。这种多线程的实现方式和开启多个消费进程的实现方式没有本质区别,优点是可以按照顺序消费每个分区中的消息,缺点是每个消费线程都要维护一个独立的TCP连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class FirstKafkaConsumerThreadDemo { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static Properties initConfig() { Properties props = new Properties(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; } public static void main(String[] args) { Properties props = initConfig(); int consumerThreadNum = 4; for (int i = 0; i < consumerThreadNum; i++) { new KafkaConsumerThread(props, topic).start(); } } public static class KafkaConsumerThread extends Thread { private KafkaConsumer<String, String> kafkaConsumer; public KafkaConsumerThread(Properties props, String topic) { this.kafkaConsumer = new KafkaConsumer<>(props); this.kafkaConsumer.subscribe(Arrays.asList(topic)); } @Override public void run() { try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Thread %d, offset = %d, key = %s, value = %s%n", Thread.currentThread().getId(), record.offset(), record.key(), record.value()); } } } catch (Exception e) { e.printStackTrace(); } finally { } } } }
|
- 多个消费线程同时消费同一个分区;通过assign、seek等方法实现,这样可以打破原有消费线程个数不能超过分区数的限制,进一步提高消费能力;但是位移提交、顺序控制比较复杂;实际中使用的比较少,一般不推荐使用。
- 将消息处理模块改为多线程的实现方式;一般来说,poll拉取消息速度是相当快的,整体消费的瓶颈也正是在处理消息这一块,所以我们将消息处理模块改为多线程的实现方式。相比第一种实现方式,这种方式不仅提供横向扩展能力,还可以减少TCP连接对资源的消耗。考虑位移提交的两种解决方式:a. 引入一个共享变量offset;b. 基于滑动窗口实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| public class ThirdMultiConsumerThreadDemo { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static Properties initConfig() { Properties props = new Properties(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, topic, Runtime.getRuntime().availableProcessors()); consumerThread.start(); } public static class KafkaConsumerThread extends Thread { private KafkaConsumer<String, String> kafkaConsumer; private ExecutorService executorService; private int threadNumber; public KafkaConsumerThread(Properties props, String topic, int threadNumber) { kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Collections.singletonList(topic)); this.threadNumber = threadNumber; executorService = new ThreadPoolExecutor( threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); } @Override public void run() { try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { executorService.submit(new RecordsHandler(record)); } } } catch (Exception e) { e.printStackTrace(); } } public static class RecordsHandler implements Runnable { public final ConsumerRecord<String, String> record; public RecordsHandler(ConsumerRecord<String, String> record) { this.record = record; } @Override public void run() { try { System.out.println("Thread ID: " + Thread.currentThread().getId() + ", Offset: " + record.offset() + ", Key: " + record.key() + ", Value: " + record.value()); } catch (Exception e) { e.printStackTrace(); } } } } }
for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset(); synchronized (offsets) { if (!offsets.containsKey(tp)) { offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + l)); } else { long position = offsets.get(tp).offset(); if (position < lastConsumedOffset + 1) { offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + l)); } } } }
|
2.11 重要的消费者参数
该参数用来配置 Consumer 在一次拉取请求(调用 poll () 方法)中能从 Kafka 中拉取的最小数据量,默认值为 1B
该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从 Kafka中拉取的最大数据 ,默认值为 52428800B,也就是 50MB 。
fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为 500ms 。如果 Kafka没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms
- max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给 Consumer的最大数据 ,默认值为 1048576 (B),也就是1M
这个参数用来配置 Consumer 次拉取请求中拉取的最大消息数,默认值为 500 (条)
这个参数用来指定在多久之后关闭限制的连接,默认值是 540000 (ms ),即9分钟。
Kafka 中有两个内部的主题__consumer_offsets、__transaction_state 。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true 。如果设置 true ,那么只能使用 subscribe( Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为false 则没有这个限制。
这个参数用来设置 Socket 接收消息缓冲区(SO_RECBU的大小,默认值为 65536B) , 64KB。如果设置为 -1,则使用操作系统的默认值。如果 Consumer和Kafka 处于不同的机房,则可以适当调大这个参数值。
这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。如果设置为-1,则使用操作系统的默认值。
这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms)
这个参数用来配置元数据的过期时间,默认值为300000(ms),即5min。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或新的broker加入。
这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50ms。这种机制适用于消费者向broker发送的所有请求。
这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为 50 ms 。这种机制适用于消费者向 broker 发送的所有请求。
这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read uncommitted ,,和“ read committed ",表示消费者所消费到的位置,如果设置为“read committed ”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO ( LastStableOffset )的位置,默认情况下为“ read_ uncommitted ”,即可以消 费到 HW (High Watermark )处的位置。