前言

  与生产者客户端一样,消费者端也由最初的scala版本过渡到现在的Java版本。

  正常的消费者逻辑需要以下4个步骤:

  1. KafkaConsumer的客户端参数配置和对应实例;
  2. 订阅主题
  3. 拉取消息并消费
  4. 提交消费者位移
  5. 关闭消费者实例

  消费者客户端比较特殊的一点是加入了消费者组的概念;


 

KafkaConsumer消费者组

  默认情况下:

  • 一个消费者组中的每个消费者会分配到不同的分区;
  • 一个topic中的消息只会被一个消费者组消费一次
  • 若一个组中的消费者数量多于partition数量,会出现消费者不会被分配分区,也就消费不到消息(如下c7消费不到数据);

KafkaConsumer使用示例

 1 public class KafkaConsumerAnalysis {
 2     public static final String brokerList = "10.26.28.99:9092";
 3     public static final String topic = "demo";
 4     public static final String groupId = "group.demo";
 5     public static final AtomicBoolean isRunning = new AtomicBoolean(true);
 6 
 7     public static Properties initConfig() {
 8         Properties props = new Properties();
 9         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
10         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
11         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
12         // 消费者组
13         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
14         // 该客户端的id
15         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
16         return props;
17     }
18 
19     public static void main(String[] args) {
20         Properties props = initConfig();
21         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
22         // 订阅某几个topic
23         consumer.subscribe(Arrays.asList(topic));
24         // 订阅demo的0号分区(订阅方式只能配置一种)
25         // consumer.assign(Arrays.asList(new TopicPartition("demo", 0)));
26         try {
27             while (isRunning.get()) {
28                 //1000ms从服务端拉取一次
29                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
30                 for (ConsumerRecord<String, String> record : records) {
31                     //do something to process record.
32                 }
33             }
34         } catch (Exception e) {
35             log.error("occur exception ", e);
36         } finally {
37             consumer.close();
38         }
39     }
40 }

简单使用示例

 

 

订阅主题与订阅分区(两者中只能配置一种,否则爆出IllegalStateException异常)

//订阅topic
consumer.subscribe(Arrays.asList(topic));
// 订阅demo的0号分区(订阅方式只能配置一种)
consumer.assign(Arrays.asList(new TopicPartition("demo", 0)));

 

KafkaConsumer的反序列化类最好还是使用kafka提供的几种方式:如 StringDeserializer 等;

KafkaConsumer消息消费与唯一提交(TODO)

 

版权声明:本文为whtblog原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/whtblog/p/11421619.html