摘要:Kafka中的位移是个极其重要的概念,因为数据一致性、准确性是一个很重要的语义,我们都不希望消息重复消费或者丢失。而位移就是控制消费进度的大佬。本文就详细聊聊kafka消费位移的那些事,包括:

概念剖析

kafka的两种位移

关于位移(Offset),其实在kafka的世界里有两种位移:

  • 分区位移:生产者向分区写入消息,每条消息在分区中的位置信息由一个叫offset的数据来表征。假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、…、9;

  • 消费位移:消费者需要记录消费进度,即消费到了哪个分区的哪个位置上,这是消费者位移(Consumer Offset)。

注意,这和上面所说的消息在分区上的位移完全不是一个概念。上面的“位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器。

消费位移

消费位移,记录的是 Consumer 要消费的下一条消息的位移,切记,是下一条消息的位移! 而不是目前最新消费消息的位移

假设一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。

至于为什么要有消费位移,很好理解,当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。就好像书签一样,需要书签你才可以快速找到你上次读书的位置。

那么了解了位移是什么以及它的重要性,我们自然而然会有一个疑问,kafka是怎么记录、怎么保存、怎么管理位移的呢?

位移的提交

Consumer 需要上报自己的位移数据,这个汇报过程被称为位移提交。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据。

鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,KafkaConsumer API提供了多种提交位移的方法,每一种都有各自的用途,这些都是本文将要谈到的方案。

void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

先粗略的总结一下。位移提交分为自动提交和手动提交;而手动提交又分为同步提交和异步提交。

自动提交

当消费配置enable.auto.commit=true的时候代表自动提交位移。

自动提交位移是发生在什么时候呢?auto.commit.interval.ms默认值是50000ms。即kafka每隔5s会帮你自动提交一次位移。自动位移提交的动作是在 poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。假如消费数据量特别大,可以设置的短一点。

越简单的东西功能越不足,自动提交位移省事的同时肯定会带来一些问题。自动提交带来重复消费和消息丢失的问题:

  • 重复消费: 在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

  • 消息丢失: 假设拉取了100条消息,正在处理第50条消息的时候,到达了自动提交窗口期,自动提交线程将拉取到的每个分区的最大消息位移进行提交,如果此时消费服务挂掉,消息并未处理结束,但却提交了最大位移,下次重启就从100条那消费,即发生了50-100条的消息丢失。

手动提交

当消费配置enable.auto.commit=false的时候代表手动提交位移。用户必须在适当的时机(一般是处理完业务逻辑后),手动的调用相关api方法提交位移。比如在下面的案例中,我需要确认我的业务逻辑返回true之后再手动提交位移

 while (true) {
     try {
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
         if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                 KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                 // 处理业务
                 boolean handleResult = handle(kafkaMessage);
                 if (handleResult) {
                     log.info(" handle success, kafkaMessage={}" ,kafkaMessage);
                 } else {
                     log.info(" handle failed, kafkaMessage={}" ,kafkaMessage);
                 }
             }
             // 手动提交offset
             consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        
         } 
     } catch (Exception e) {
         log.info("kafka consume error." ,e);
     }
 }

手动提交明显能解决消息丢失的问题,因为你是处理完业务逻辑后再提交的,假如此时消费服务挂掉,消息并未处理结束,那么重启的时候还会重新消费。

但是对于业务层面的失败导致消息未消费成功,是无法处理的。因为业务层的逻辑千变万化、比如格式不正确,你叫Kafka消费端程序怎么去处理?应该要业务层面自己处理,记录失败日志做好监控等。

但是手动提交不能解决消息重复的问题,也很好理解,假如消费0-100条消息,50条时挂了,重启后由于没有提交这一批消息的offset,是会从0开始重新消费。至于如何避免重复消费的问题,在这篇文章有说。

手动提交又分为异步提交和同步提交。

同步提交

上面案例代码使用的是commitSync() ,顾名思义,是同步提交位移的方法。同步提交位移Consumer 程序会处于阻塞状态,等待 Broker 返回提交结果。同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。因此,为了解决这些不足,kafka还提供了异步提交方法。

异步提交

异步提交会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数,供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法

 consumer.commitAsync((offsets, exception) -> {
 if (exception != null)
     handleException(exception);
 });

但是异步提交会有一个问题,那就是它没有重试机制,不过一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。所以消息也是不会丢失和重复消费的。
但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。因此,组合使用commitAsync()commitSync()是最佳的方式。

try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                boolean handleResult = handle(kafkaMessage);             
             }
             //异步提交位移               
             consumer.commitAsync((offsets, exception) -> {
             if (exception != null)
                 handleException(exception);
             });
           
        }
    }
} catch (Exception e) {
    System.out.println("kafka consumer error:" + e.toString());
} finally {
    try {
        //最后同步提交位移
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

让位移提交更加灵活和可控

如果细心的阅读了上面所有demo的代码,那么你会发现这样几个问题:

1、所有的提交,都是提交 poll 方法返回的所有消息的位移,poll 方法一次返回1000 条消息,则一次性地将这 1000 条消息的位移一并提交。可这样一旦中间出现问题,位移没有提交,下次会重新消费已经处理成功的数据。所以我想做到细粒度控制,比如每次提交100条,该怎么办?

答:可以通过commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)对位移进行精确控制。

2、poll和commit方法对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的具体位置。我都不知道这次的提交,是针对哪个partition,提交上去的offset是多少。

答:可以通过record.topic()获取topic信息, record.partition()获取分区信息,record.offset() + 1获取消费位移,记住消费位移是指示下一条消费的位移,所以要加一。

3、我想自己管理offset怎么办?一方面更加保险,一方面下次重启之后可以精准的从数据库读取最后的offset就不存在丢失和重复消费了。
答:可以将消费位移保存在数据库中。消费端程序使用comsumer.seek方法指定从某个位移开始消费。

综合以上几个可优化点,并结合全文,可以给出一个比较完美且完整的demo:联合异步提交和同步提交,对处理过程中所有的异常都进行了处理。细粒度的控制了消费位移的提交,并且保守的将消费位移记录到了数据库中,重新启动消费端程序的时候会从数据库读取位移。这也是我们消费端程序位移提交的最佳实践方案。你只要继承这个抽象类,实现你具体的业务逻辑就可以了。

public abstract class PrefectCosumer {
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    public final void consume() {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", getGroupId());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(getTopics());
        consumer.poll(0);
        // 把offset记录到数据库中 从指定的offset处消费 
        consumer.partitionsFor(getTopics()).stream().map(info ->
        new TopicPartition(getTopics(), info.partition()))
        .forEach(tp -> {
               consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
         });
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> record : consumerRecords) {

                        KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                        boolean handleResult = handle(kafkaMessage);
                        if (handleResult) {
                            //注意:提交的是下一条消息的位移。所以OffsetAndMetadata 对象时,必须使用当前消息位移加 1。
                            offsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1));

                            // 细粒度控制提交 每10条提交一次offset
                            if (count % 10 == 0) {
                                // 异步提交offset
                                consumer.commitAsync(offsets, (offsets, exception) -> {
                                    if (exception != null) {
                                        handleException(exception);
                                    }
                                    // 将消费位移再记录一份到数据库中
                                    offsets.forEach((k, v) -> {
                                        String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                " on duplicate key update offset=values(offset);";
                                        JdbcUtils.insertTable(s);
                                    });


                                });
                            }
                            count++;
                        } else {         
                            System.out.println("消费消息失败 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());                         
                        }
                    }


                }
            }
        } catch (Exception e) {
            System.out.println("kafka consumer error:" + e.toString());
        } finally {
            try {
                // 最后一次提交 使用同步提交offset
                consumer.commitSync();
            } finally {
                consumer.close();
            }


        }
    }


    /**
     * 具体的业务逻辑
     *
     * @param kafkaMessage
     * @return
     */
    public abstract boolean handle(KafkaMessage kafkaMessage);

    public abstract List<String> getTopics();

    public abstract String getGroupId();

    void handleException(Exception e) {
        //异常处理
    }
}

控制位移提交的N种方式

刚刚我们说自己控制位移,使用seek方法可以指定offset消费。那到底怎么控制位移?怎么重设消费组位移?seek是什么?现在就来仔细说说。

并不是所有的消息队列都可以重设消费者组位移达到重新消费的目的。比如传统的RabbitMq,它们处理消息是一次性的,即一旦消息被成功消费,就会被删除。而Kafka消费消息是可以重演的,因为它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,所以消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。

了解如何重设位移是很重要的。假设这么一个场景,我已经消费了1000条消息后,我发现处理逻辑错了,所以我需要重新消费一下,可是位移已经提交了,我到底该怎么重新消费这1000条呢??假设我想从某个时间点开始消费,我又该如何处理呢?

首先说个误区:auto.offset.reset=earliest/latest这个参数大家都很熟悉,但是初学者很容易误会它。大部分朋友都觉得在任何情况下把这两个值设置为earliest或者latest ,消费者就可以从最早或者最新的offset开始消费,但实际上并不是那么回事,他们生效都有一个前提条件,那就是对于同一个groupid的消费者,如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费。因此这个参数并不能解决用户想重设消费位移的需求。

kafka有七种控制消费组消费offset的策略,主要分为位移维度和时间维度,包括:

  • 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略

  • 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。包括DateTime和Duration策略

说完了重设策略,我们就来看一下具体应该如何实现,可以从两个角度,API方式和命令行方式。

重设位移的方法之API方式

API方式只要记住用seek方法就可以了,包括seek,seekToBeginning 和 seekToEnd。

void seek(TopicPartition partition, long offset);    
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
void seekToBeginning(Collection<TopicPartition> partitions);    
void seekToEnd(Collection<TopicPartition> partitions);    

从方法签名我们可以看出seekToBeginningseekToEnd是可以一次性重设n个分区的位移,而seek 只允许重设指定分区的位移,即为每个分区都单独设置位移,因为不难得出,如果要自定义每个分区的位移值则用seek,如果希望kafka帮你批量重设所有分区位移,比如从最新数据消费或者从最早数据消费,那么用seekToEnd和seekToBeginning。

Earliest 策略:从最早的数据开始消费

从主题当前最早位移处开始消费,这个最早位移不一定就是 0 ,因为很久远的消息会被 Kafka 自动删除,主要取决于你的删除配置。

代码如下:

Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", getGroupId());
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
   new TopicPartition(getTopics(), partitionInfo.partition()))
   .collect(Collectors.toList()));

首先是构造consumer对象,这样我们可以通过partitionsFor获取到分区的信息,然后我们就可以构造出TopicPartition集合,传给seekToBegining方法。需要注意的一个地方是:需要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))

在poll(0)中consumer会一直阻塞直到它成功获取了所需的元数据信息,之后它才会发起fetch请求去获取数据。而poll(Duration)会把元数据获取也计入整个超时时间。由于本例中使用的是0,即瞬时超时,因此consumer根本无法在这么短的时间内连接上coordinator,所以只能赶在超时前返回一个空集合。

Latest策略:从最新的数据开始消费

    consumer.seekToEnd(
        consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
            new TopicPartition(getTopics().get(0), partitionInfo.partition()))
              .collect(Collectors.toList()));

Current策略:从当前已经提交的offset处消费

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
        new TopicPartition(getTopics().get(0), info.partition()))
        .forEach(tp -> {
            long committedOffset = consumer.committed(tp).offset();
            consumer.seek(tp, committedOffset);
        });

**Special-offset策略:从指定的offset处消费 **

该策略使用的方法和current策略一样,区别在于,current策略是直接从kafka元信息中读取中已经提交的offset值,而special策略需要用户自己为每一个分区指定offset值,我们一般是把offset记录到数据库中然后可以从数据库去读取这个值

    consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
                new TopicPartition(getTopics().get(0), info.partition()))
                .forEach(tp -> {
                    try {
                        consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                });

以上演示了用API方式重设位移,演示了四种常见策略的代码,另外三种没有演示,一方面是大同小异,另一方面在实际生产中,用API的方式不太可能去做时间维度的重设,而基本都是用命令行方式。

重设位移的方法之命令行方式

命令行方式重设位移是通过 kafka-consumer-groups 脚本。比起 API 的方式,用命令行重设位移要简单得多。

Earliest 策略指定–to-earliest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest 策略指定–to-latest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Current 策略指定–to-current。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略指定–to-offset。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

Shift-By-N 策略指定–shift-by N。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

DateTime 策略指定–to-datetime。

DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略指定–by-duration。
Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

提交的位移都去哪了?

通过上面那几部分的内容,我们已经搞懂了位移提交的方方面面,那么提交的位移它保存在哪里呢?这就要去位移主题的的世界里一探究竟了。kafka把位移保存在一个叫做__consumer_offsets的内部主题中,叫做位移主题。

注意:老版本的kafka其实是把位移保存在zookeeper中的,但是zookeeper并不适合这种高频写的场景。所以新版本已经是改进了这个方案,直接保存到kafka。毕竟kafka本身就适合高频写的场景,并且kafka也可以保证高可用性和高持久性。

既然它也是主题,那么离不开分区和副本这两个机制。我们并没有手动创建这个主题并且指定,所以是kafka自动创建的, 分区的数量取决于Broker 端参数 offsets.topic.num.partitions,默认是50个分区,而副本参数取决于offsets.topic.replication.factor,默认是3。

既然也是主题,肯定会有消息,那么消息格式是什么呢?参考前面我们手动设计将位移写入数据库的方案,我们保存了topic,group_id,partition,offset四个字段。topic,group_id,partition无疑是数据表中的联合主键,而offset是不断更新的。无疑kafka的位移主题消息也是类似这种设计。key也是那三个字段,而消息体其实很复杂,你可以先简单理解为就是offset。

既然也是主题,肯定也会有删除策略,否则消息会无限膨胀。但是位移主题的删除策略和其他主题删除策略又不太一样。我们知道普通主题的删除是可以通过配置删除时间或者大小的。而位移主题的删除,叫做 Compaction。Kafka 使用Compact 策略来删除位移主题中的过期消息,对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

总结

kafka的位移是个极其重要的概念,控制着消费进度,也即控制着消费的准确性,完整性,为了保证消息不重复和不丢失。我们最好做到以下几点:

  • 手动提交位移。

  • 手动提交有异步提交和同步提交两种方式,既然两者有利也有弊,那么我们可以结合起来使用。

  • 细粒度的控制消费位移的提交,这样可以避免重复消费的问题。

  • 保守的将消费位移再记录到了数据库中,重新启动消费端程序的时候从数据库读取位移。

获取Kafka全套原创学习资料及思维导图,关注【胖滚猪学编程】公众号,回复”kafka”。

本文来源于公众号:【胖滚猪学编程】。一枚集颜值与才华于一身,不算聪明却足够努力的女程序媛。用漫画形式让编程so easy and interesting!求关注!

版权声明:本文为liuyanling原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/liuyanling/p/13149400.html