kafka的暂停消费和重新开始消费问题 - 呢喃的歌声

yaohaitao 2021-08-08 原文
  1. //暂停kafka的消费 暂停分区的分配
    consumer.unsubscribe();//此处不取消订阅暂停太久会出现订阅超时的错误
    consumer.pause(consumer.assignment());



    //重新消费分区,此处不重新分配会出错
  1. this.open(null,null,null);
  1. if (null == consumer) {
    Properties props = new Properties();
    props.put("bootstrap.servers", PropertiesUtil.getValue("bootstrap.servers"));
    // 消费者的组id
    props.put("group.id", constant.kafka_groupName);//Spider2
    props.put("enable.auto.commit", "false");
    // max.poll.interval.ms(官网给得默认值为3000)的意思为,当我们从kafkaServer端poll消息时,poll()的调用之间的最大延迟。
    // 这提供了消费者在获取更多记录之前可以空闲的时间量的上限。 如果在此超时到期之前未调用poll(),则认为使用者失败,并且消费
    // 者组将重新平衡以便将分区重新分配给其他消费者,而恰好这里我们设置了Thread.sleep(6000) > max.poll.interval.ms值,
    // 也就是我们在手动提交的时候,实际上分区信息已经被分配到整个消费者组里面的其它消费者了
    props.put("auto.commit.interval.ms", "3000");
    // 从poll(拉)的回话处理时长
    props.put("session.timeout.ms", "100000");
    props.put("request.timeout.ms", "200000");
    props.put("max.poll.records", "2");
    // poll的数量限制
    // props.put("max.poll.records", "100");
    /* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("group.name", UUID.randomUUID().toString().replaceAll("-", ""));
    consumer = new KafkaConsumer<String, String>(props);

    // 订阅主题列表topic
    //consumer.subscribe(Arrays.asList("test_input"));
    }
    //注册kafka rebalanceListener
    //consumer.subscribe(Arrays.asList("test_etl"), new ConsumerRebalanceListener(){

    listener = new ConsumerRebalanceListener(){
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
    consumer.commitSync(offsetsMap);
    consumer.commitSync();
    }
    @Override
    public void onPartitionsAssigned(
    Collection<TopicPartition> partitions) {
    System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
    consumer.commitSync();
    offsetsMap.clear();
    }};

    consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), listener);
  1. consumer.resume(consumer.assignment());
发表于
2020-01-09 18:08 
呢喃的歌声 
阅读(4275
评论(0
编辑 
收藏 
举报

 

版权声明:本文为yaohaitao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yaohaitao/p/12172867.html

kafka的暂停消费和重新开始消费问题 - 呢喃的歌声的更多相关文章

  1. \”学习Unity3D有什么比较好的资料嘛?\” – Caiger

    (该文来自原来老博客这里,也是我之前在知乎上“学习Unity3D有什么比较好的资料嘛?”问题的回答) (本内容 […]...

  2. normalize()错误 – Vulkan

    normalize()错误 struct vecto3 { float x; float y; float z […]...

  3. 光圈快门 – kramer

    光圈快门 光圈特性(一):控制进光量  光圈作为相机镜头内的一个元件,它的作用是控制透过镜头进入机身内感光元件 […]...

  4. JMXtrans + InfluxDB + Grafana实现Zookeeper性能指标监控

    JMXtrans + InfluxDB + Grafana实现Zookeeper性能指标监控 一、总体效果图 […]...

  5. 使用Alcatraz为Xcode安装XActivatePowerMode插件, 从此敲代码逼格大大滴~ – wnfight

    使用Alcatraz为Xcode安装XActivatePowerMode插件, 从此敲代码逼格大大滴~ Xco […]...

  6. LVS:三种负载均衡方式比较+另三种负载均衡方式 – ilinux_one

    LVS:三种负载均衡方式比较+另三种负载均衡方式 转:http://blog.csdn.net/u013256 […]...

  7. Microsoft Band 开发 (2) – 传感器 – Coding 的高同学

    Microsoft Band 开发 (2) – 传感器 距离上一次发文已经过去三个月了,三个月里发生了好多事情 […]...

  8. django 配置 多数据库 – ZealouSnesS

    django 配置 多数据库 2018-04-09 08:44  ZealouSnesS  阅读(2008)  […]...

随机推荐

  1. webpack介绍—上

    6.1 webpack概念的引入   在网页中会引用哪些常见的静态资源? JS   .js、 .jsx 、.c […]...

  2. JavaSE 8 离线API下载

    1、当前可用地址http://www.oracle.com/technetwork/java/javase/d […]...

  3. 为什么catch了异常,但事务还是回滚了?

    前几天我发了这篇文章《我来出个题:这个事务会不会回滚?》得到了很多不错的反馈,也有不少读者通过微信、群或者邮件 […]...

  4. 老爷子这代码,看跪了!

    这是why的第 99 篇原创文章 你好呀,我是why哥。 不是,这个照片不是我,标题说的老爷子就是这个哥们,这 […]...

  5. linux下几款可用网盘对比

    现在的网盘数不胜数,例如115网盘、金山网盘、华为网盘,最近有位业界一哥也侵入相关业务,出了个新贵百度网盘 但 […]...

  6. 笔记本硬盘故障与简单维修

     笔记本硬盘故障与简单维修   dos分区不能启动   1.硬盘含有的dos主分区未被激活,硬盘启动机器时因无 […]...

  7. Hadoop完全分布式搭建

    实验一:基础环境配置。 实验任务一:Linux基础环境配置 步骤一:查看ip 设置静态ip:3.网络配置文件设置1)进入网络配置文件目录cd /etc/sysconfig/network-scripts2)编辑网络配置...

  8. HTML5期末大作业:运动系列——NBA篮球主题学生网页设计(7个页面) HTML+CSS+JavaScript 体育网页设计HTML代码 学生网页课程设计期末作业下载 大学生网页设计制作成

    HTML5期末大作业:运动系列——NBA篮球主题学生网页设计(7个页面) HTML+CSS+JavaScrip […]...