Kafka常用命令

  日常进程kafka运维操作时,除了借助kafka-manager图形工具查看kafka信息时,我们还可以通过kafka提供的命令工具来进行kafka的操作与查看。

1、查看所有topic

./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 topic --list

2、删除某个topic

./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 --topic test  --delete

3、命令行消费某个topic的消息

./kafka-console-consumer.sh --bootstrap-server=10.111.30.3:9092 --topic=loc --from-beginning  #从头开始消费

4、kafka重置分组已经消费的偏移量offest

./kafka-consumer-groups.sh --bootstrap-server=10.111.30.10:9092,10.111.30.5:9092,10.111.30.3:9092 --execute --reset-offsets --topic=loc_topic1 --group=testPlatform-local-pha  --to-earliest

5、命令行创建主题,指定该主题的定时清理时间

./kafka-topics.sh --create --zookeeper localhost:2181 --topic test  --partitions 1 --replication-factor 1 --config retention.ms=172800000

6、查看topic属性

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic access

7、查看分组所属topic的消费情况

./kafka-consumer-groups.sh --bootstrap-server=10.2.48.31:9092,10.2.32.26:9092,10.2.16.25:9092  --group=testPlatform --describe

8、topic增加分区

./kafka-topics.sh --alter --zookeeper 192.168.119.131:2181 --topic yqtopic1 --partitions 12   #分区数增加到12个,会引起rebalance操作

9、创建主题(必须指定分区和副本,否则会报错)

./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 --create --topic TEST --partitions 1 -replication-factor 1

10、指定topic创建消费者分组

./kafka-console-consumer.sh  --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic TEST --consumer-property group.id=test1

11、将某个consumer group 的某个topic或所有topic重置到某个时间点

相关格式:

--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                          Format: \'YYYY-MM-DDTHH:mm:SS.sss\'      2020-07-01T12:00:00.000
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group  

12、时间点重置

./kafka-consumer-groups.sh --bootstrap-server=192.168.2.4:9092,192.168.2.3:9092,192.168.2.2:9092 --group=gateway-calc-trip --topic=loc --execute --reset-offsets --to-datetime 2020-07-01T12:00:00.000   
#此处时间为UTC时间,需修改为提前8个小时,修改完成可命令行查看重置后的offset,然后指定这个offset及分区开始消费,查看当前消息时间,验证重置时间点是否正确

13、offset重置

./kafka-consumer-groups.sh  --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic=locationUp --group=jt809-down-xg --execute --reset-offsets --to-offset 359905139

14、offset重置(所有topic)

./kafka-consumer-groups.sh --bootstrap-server 10.111.30.4:9092,10.111.30.8:9092,10.111.30.9:9092 --group testPlatform2 --reset-offsets --all-topics --to-latest  --execute

15、kafka指定offset与partition导出消息

./kafka-console-consumer.sh --bootstrap-server 10.111.30.4:9092 --topic topic-upstreammsg-statistic  --offset 825000 --partition 0 >> log.log

16、修改主题级别的参数

bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

17、查看topic消息总数

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
test-topic:0:0
test-topic:1:0
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic

test-topic:0:5500000
test-topic:1:5500000

-1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数,在这个例子中,差值是 500 万条

18、测试生产者性能脚本kafka-producer-perf-test.sh

$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
  1. 向指定主题发送了 1 千万条消息,每条消息大小是 1KB。该命令允许你在 producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等。
  2. 该命令的输出值得好好说一下。它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布
  3. 在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。

19、测试消费者性能脚本kafka-consumer-perf-test.sh

$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

  虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 1723MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。

版权声明:本文为wushaoyu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wushaoyu/p/11486551.html