Kafka常用命令
Kafka常用命令
日常进程kafka运维操作时,除了借助kafka-manager图形工具查看kafka信息时,我们还可以通过kafka提供的命令工具来进行kafka的操作与查看。
1、查看所有topic
./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 topic --list
2、删除某个topic
./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 --topic test --delete
3、命令行消费某个topic的消息
./kafka-console-consumer.sh --bootstrap-server=10.111.30.3:9092 --topic=loc --from-beginning #从头开始消费
4、kafka重置分组已经消费的偏移量offest
./kafka-consumer-groups.sh --bootstrap-server=10.111.30.10:9092,10.111.30.5:9092,10.111.30.3:9092 --execute --reset-offsets --topic=loc_topic1 --group=testPlatform-local-pha --to-earliest
5、命令行创建主题,指定该主题的定时清理时间
./kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1 --config retention.ms=172800000
6、查看topic属性
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic access
7、查看分组所属topic的消费情况
./kafka-consumer-groups.sh --bootstrap-server=10.2.48.31:9092,10.2.32.26:9092,10.2.16.25:9092 --group=testPlatform --describe
8、topic增加分区
./kafka-topics.sh --alter --zookeeper 192.168.119.131:2181 --topic yqtopic1 --partitions 12 #分区数增加到12个,会引起rebalance操作
9、创建主题(必须指定分区和副本,否则会报错)
./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 --create --topic TEST --partitions 1 -replication-factor 1
10、指定topic创建消费者分组
./kafka-console-consumer.sh --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic TEST --consumer-property group.id=test1
11、将某个consumer group 的某个topic或所有topic重置到某个时间点
相关格式:
--to-current Reset offsets to current offset. --to-datetime <String: datetime> Reset offsets to offset from datetime. Format: \'YYYY-MM-DDTHH:mm:SS.sss\' 2020-07-01T12:00:00.000 --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long: offset> Reset offsets to a specific offset. --topic <String: topic> The topic whose consumer group
12、时间点重置
./kafka-consumer-groups.sh --bootstrap-server=192.168.2.4:9092,192.168.2.3:9092,192.168.2.2:9092 --group=gateway-calc-trip --topic=loc --execute --reset-offsets --to-datetime 2020-07-01T12:00:00.000 #此处时间为UTC时间,需修改为提前8个小时,修改完成可命令行查看重置后的offset,然后指定这个offset及分区开始消费,查看当前消息时间,验证重置时间点是否正确
13、offset重置
./kafka-consumer-groups.sh --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic=locationUp --group=jt809-down-xg --execute --reset-offsets --to-offset 359905139
14、offset重置(所有topic)
./kafka-consumer-groups.sh --bootstrap-server 10.111.30.4:9092,10.111.30.8:9092,10.111.30.9:9092 --group testPlatform2 --reset-offsets --all-topics --to-latest --execute
15、kafka指定offset与partition导出消息
./kafka-console-consumer.sh --bootstrap-server 10.111.30.4:9092 --topic topic-upstreammsg-statistic --offset 825000 --partition 0 >> log.log
16、修改主题级别的参数
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
17、查看topic消息总数
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic test-topic:0:0 test-topic:1:0 $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic test-topic:0:5500000 test-topic:1:5500000 -1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数,在这个例子中,差值是 500 万条
18、测试生产者性能脚本kafka-producer-perf-test.sh
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4 2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency. 4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency. 10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
- 向指定主题发送了 1 千万条消息,每条消息大小是 1KB。该命令允许你在 producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等。
- 该命令的输出值得好好说一下。它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布
- 在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。
19、测试消费者性能脚本kafka-consumer-perf-test.sh
$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012
虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 1723MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。