kafka09-消费者,消费组
1.1.1 概念
1.1.1.1 消费者、消费组
消费者从订阅主题消费数据时,消费偏移量会保存在 __consumer_offsets主题中
消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。
推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发
group_id一般设置为应用的逻辑名称。比如多个订单处理程序组成一个消费组,可以设置group_id 为”order_process”。
同组消费者数目为x个,被消费主题的主题分区为y个
- x < y时,一个分区数据,会平均分配在x个消费者中,此时消费组中的消费者可能会占有多个分区,但是同一个分区,必不会被多个消费者占有
- x = y时,x个消费者每个占有一个分区
- x > y时,存在(x – y)个消费者不会消费数据,有y个消费者会消费分区数据(每个消费者消费一个分区数据),不推荐
1.1.1.2 心跳机制
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常 时,Consumer 才会发送心跳。 Consumer 和 Rebalance 相关的 2 个配置参数
参数 | 字段 |
---|---|
session.timeout.ms | MemberMetadata.sessionTimeoutMs |
max.poll.interval.ms | MemberMetadata.rebalanceTimeoutMs |
broker 端:sessionTimeoutMs 参数 broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期,broker coordinator 会把消费者从 group 中移除,并触发 rebalance。
consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数 如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 rebalance
1.1.2 接受
1.1.2.1 消息接收
参数 | 说明 |
---|---|
client.id | 当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上 提供应用的逻辑名称,记录在服务端的请求日志中,用于追踪请求的 源。 |
group.id | 用于唯一标志当前消费者所属的消费组的字符串。 如果消费者使用组管理功能如subscribe(topic)或使用基于Kafka的偏移 量管理策略,该项必须设置。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被 删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量 latest:自动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异 常 anything:向消费者抛异常 |
enable.auto.commit | 如果设置为true,消费者会自动周期性地向服务器提交偏移量。 |
1.1.2.2 订阅
consumer 采用 pull 模式从 broker 中读取数据。 采用 pull 模式,consumer 可自主控制消费消息的速率, 可以自己控制消费方式(批量消费/逐条 消费),还可以选择不同的提交方式从而实现不同的传输语义。 consumer.subscribe(“tp_demo_01,tp_demo_02”)
1.1.2.3 反序列化
Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化 处理,然后才能交给用户程序消费处理。
消费者的反序列化器包括key的和value的反序列化器。
key.deserializer,value.deserializer 需要实现 org.apache.kafka.common.serialization.Deserializer 接口。
消费者从订阅的主题拉取消息: consumer.poll(3_000); 在Fetcher类中,对拉取到的消息首先进行反序列化处理
1.1.2.4 位移提交
- Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 提交位移(Committing Offsets)
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets
- 位移提交分为自动提交和手动提交
- 位移提交分为同步提交和异步提交
1.1.2.4.1 自动提交
Kafka Consumer 后台提交
- 开启自动提交: enable.auto.commit=true
- 配置自动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
configs.put("group.id", "mygrp");
// 设置偏移量自动提交。自动提交是默认值。这里做示例。
configs.put("enable.auto.commit", "true");
// 偏移量自动提交的时间间隔
configs.put("auto.commit.interval.ms", "3000");
configs.put("key.deserializer", StringDeserializer.class);
configs.put("value.deserializer", StringDeserializer.class);
1.1.2.4.2 异步提交
- 使用 KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset
- 该方法为同步操作,等待直到 offset 被成功提交才返回
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
- commitSync 在处理完所有消息之后,手动同步提交可以控制offset提交的时机和频率
- 手动同步提交会:
- 调用 commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
- 会影响 TPS
- 可以选择拉长提交间隔,但有以下问题
- 会导致 Consumer 的提交频率下降
- Consumer 重启后,会有更多的消息被消费
异步提交
KafkaConsumer#commitAsync
while (true) {
ConsumerRecords<String, String> records = consumer.poll(3_000);
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
handle(exception);
}
});
}
commitAsync出现问题不会自动重试,处理方式
try {
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
1.1.2.5 消费者位移管理
Kafka中,消费者根据消息的位移顺序消费消息。 消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题 __consumer_offsets中。 Kafka提供了消费者API,让消费者可以管理自己的位移。
API如下:KafkaConsumer
API | 说明 |
---|---|
public void assign(Collection partitions) | 给当前消费者手动分配一系列主题分区。 手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。 如果给出的主题分区是空的,则等价于调用unsubscribe方法。 手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题的 元数据改变了,不会触发分区分配的再平衡。 手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection, ConsumerRebalanceListener)一起使用。 如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分 配中的消费偏移量进行异步提交。 |
public Set assignment() | 获取给当前消费者分配的分区集合。如果订阅是通过调用assign方法直接分配主题分区, 则返回相同的集合。如果使用了主题订阅,该方法返回当前分配给该消费者的主题分区集 合。如果分区订阅还没开始进行分区分配,或者正在重新分配分区,则会返回none。 |
public Map> listTopics() | 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用 |
public List partitionsFor(String topic) | 获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发 起远程调用。 |
public Map beginningOffsets(Collection partitions) | 对于给定的主题分区,列出它们第一个消息的偏移量。 注意,如果指定的分区不存在,该方法可能会永远阻塞。 该方法不改变分区的当前消费者偏移量。 |
public void seekToEnd(Collection partitions) | 将偏移量移动到每个给定分区的最后一个。 该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移 动到最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。 |
public void seek(TopicPartition partition, long offset) | 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏 移量。 若该方法多次调用,则最后一次的覆盖前面的。 如果在消费中间随意使用,可能会丢失数据。 |
public long position(TopicPartition partition) | 检查指定主题分区的消费偏移量 |
public void seekToBeginning(Collection partitions) | 将给定每个分区的消费者偏移量移动到它们的起始偏移量。该方法懒执行,只有当调用过 poll方法或position方法之后才会执行。如果没有提供分区,则将所有分配给当前消费者的 分区消费偏移量移动到起始偏移量。 |
1.1.2.6 再平衡
再平衡可以说是kafka为人诟病最多的一个点了。
再平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。 比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分 配到5个分区,这个分配的过程就是再平衡。
再平衡的触发条件主要有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
- 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发再平衡
- 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就 会触发再平衡
为什么说再平衡为人诟病呢?
因为再平衡过程中,消费者无法从kafka消费消息,这对kafka的 TPS影响极大,而如果kafka集内节点较多,比如数百个,那再平衡可能会耗时极多。
数分钟到数小时 都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免再平衡发生。
1.1.2.6 避免再平衡
要说完全避免再平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最 常见的引发再平衡的地方,所以我们需要保证尽力避免消费者故障
而其他几种触发再平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主 动控制。
如果消费者真正挂掉了,就没办法了,但实际中,会有一些情况,kafka错误地认为一个正常的消 费者已经挂掉了,我们要的就是避免这样的情况出现。
在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过 重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在kafka消费 者场景中,session.timout.ms参数就是规定这个超时时间是多少。 还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误 判,但也会消耗更多资源。
此外,还有最后一个参数,max.poll.interval.ms,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是 5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。
三个参数,
session.timout.ms控制心跳超时时间,
heartbeat.interval.ms控制心跳发送频率,
max.poll.interval.ms控制poll的间隔。
这里给出一个相对较为合理的配置,如下:
- session.timout.ms:设置为6s
- heartbeat.interval.ms:设置2s
- max.poll.interval.ms:推荐为消费者处理消息最长耗时再加1分钟
1.1.2.7 消费者拦截
消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。
处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。
消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor
接口。
- 一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
- 该接口的实现类通过configre方法获取消费者配置的属性,如果消费者配置中没有指定 clientID,还可以获取KafkaConsumer生成的clientId。获取的这个配置是跟其他拦截器共享 的,需要保证不会在各个拦截器之间产生冲突。
- ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置 了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
- ConsumerInterceptor回调发生在 org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)方法同一个线程
1.2.1 消费组
1.2.1 简介
consumer group是kafka提供的可扩展且具有容错性的消费者机制。
三个特性:
- 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程
- group.id是一个字符串,唯一标识一个消费组
- 消费组订阅的主题每个分区只能分配给消费组一个消费者。
1.2.1.1 消费者位移
消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。 每个消费组保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入 checkpoint机制定期持久化。
1.2.2 位移管理
1.2.2.1 自动VS手动
Kafka默认定期自动提交位移( enable.auto.commit = true ),也手动提交位移。另外kafka会定 期把group消费情况保存起来,做成一个offset map,如下图所示:
1.2.2.2 位移提交
位移是提交到Kafka中的 __consumer_offsets 主题。 __consumer_offsets 中的消息保存了每个 消费组某一时刻提交的offset信息。
[root@dw2 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server dw1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /opt/app/kafka/config/consumer.properties --from-beginning | head
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184613128, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184618128, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184623131, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184628131, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184633133, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184638134, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184643135, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184680805, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184685800, expireTimestamp=None)
[group_gc11,group_test1,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1629184690803, expireTimestamp=None)
1.2.3 再平衡
1.2.3.1 如何进行组内分区分配
三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor
1.2.3.2 谁来执行再均衡和消费组管理?
Kafka提供了一个角色:Group Coordinator来执行对于消费组的管理。
Group Coordinator——每个消费组分配一个消费组协调器用于组管理和位移管理。当消费组的第 一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费 者和该组协调器协调通信。
1.2.3.3 如何确定coordinator?
两步:
-
确定消费组位移信息写入 consumers_offsets 的哪个分区。
具体计算公式: __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
注意:groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认是50个分区。
-
该分区leader所在的broker就是组协调器。
1.2.3.4 Rebalance Generation
它表示Rebalance之后主题分区到消费组中消费者映射关系的一个版本,主要是用于保护消费组, 隔离无效偏移量提交的。如上一个版本的消费者无法提交位移到新版本的消费组中,因为映射关系变 了,你消费的或许已经不是原来的那个分区了。每次group进行Rebalance之后,Generation号都会加 1,表示消费组和分区的映射关系到了一个新版本,如下图所示: Generation 1时group有3个成员,随 后成员2退出组,消费组协调器触发Rebalance,消费组进入Generation 2,之后成员4加入,再次触发 Rebalance,消费组进入Generation 3
1.2.3.5 协议(protocol)
kafka提供了5个协议来处理与消费组协调相关的问题:
- Heartbeat请求:consumer需要定期给组协调器发送心跳来表明自己还活着
- LeaveGroup请求:主动告诉组协调器我要离开消费组
- SyncGroup请求:消费组Leader把分配方案告诉组内所有成员
- JoinGroup请求:成员请求加入组
- DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息 等。通常该请求是给管理员使用
组协调器在再均衡的时候主要用到了这5种请求。
1.2.3.6 liveness
消费者如何向消费组协调器证明自己还活着? 通过定时向消费组协调器发送Heartbeat请求。如果 超过了设定的超时时间,那么协调器认为该消费者已经挂了。一旦协调器认为某个消费者挂了,那么它 就会开启新一轮再均衡,并且在当前其他消费者的心跳响应中添加“REBALANCE_IN_PROGRESS”,告诉 其他消费者:重新分配分区。
1.2.3.7再均衡过程
再均衡分为2步:Join和Sync
- Join, 加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有 成员都发送了JoinGroup请求,协调器从中选择一个消费者担任Leader的角色,并把组成员 信息以及订阅信息发给Leader。
- Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分 配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发 SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进 SyncGroup的response中发给各个消费者。
注意:消费组的分区分配方案在客户端执行。Kafka交给客户端可以有更好的灵活性。Kafka默认提 供三种分配策略:range和round-robin和sticky。可以通过消费者的参数: partition.assignment.strategy 来实现自己分配策略。
1.2.3.8 消费组状态机
消费组组协调器根据状态机对消费组做不同的处理:
- Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态 响应各种请求都是一个response: UNKNOWN_MEMBER_ID
- Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
- PreparingRebalance:组准备开启新的rebalance,等待成员加入
- AwaitingSync:正在等待leader consumer将分配方案传给各个成员
- Stable:再均衡完成,可以开始消费