Kafka 入门(四)-- Python Kafka Client 性能测试
一、前言
由于工作原因使用到了 Kafka,而现有的代码并不能满足性能需求,所以需要开发高效读写 Kafka 的工具,本文是一个 Python Kafka Client 的性能测试记录,通过本次测试,可以知道选用什么第三方库的性能最高,选用什么编程模型开发出来的工具效率最高。
二、第三方库性能测试
1.第三方库
此次测试的是三个主要的 Python Kafka Client:pykafka、kafka-python 和 confluent-kafka,具体介绍见官网:
- pykafka:https://pypi.org/project/pykafka/
- kafka-python:https://pypi.org/project/kafka-python/
- confluent_kafka:https://pypi.org/project/confluent-kafka/
2.测试环境
此次测试使用的 Python 版本是2.7,第三方库的版本为:
- pykafka:2.8.0
- kafka-python:2.0.2
- confluent-kafka:1.5.0
使用的数据总量有50万,每条数据大小为2KB,总共为966MB。
3.测试过程
(1)Kafka Producer 测试
分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Producer 对象,然后调用相应的 produce 方法将数据推送给 Kafka,数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以推送的数据条数和大小,比较得出性能最优的。
代码示例(以 pykafka 为例):
- 1 import sys
- 2 from datetime import datetime
- 3 from pykafka import KafkaClient
- 4
- 5
- 6 class KafkaProducerTool():
- 7 def __init__(self, broker, topic):
- 8 client = KafkaClient(hosts=broker)
- 9 self.topic = client.topics[topic]
- 10 self.producer = self.topic.get_producer()
- 11
- 12 def send_msg(self, msg):
- 13 self.producer.produce(msg)
- 14
- 15
- 16 if __name__ == \'__main__\':
- 17 producer = KafkaProducerTool(broker, topic)
- 18 print(datetime.now())
- 19 for line in sys.stdin:
- 20 producer.send_msg(line.strip())
- 21 producer.producer.stop()
- 22 print(datetime.now())
(2)Kafka Consumer 测试
分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Consumer 对象,然后调用相应的 consume 方法从 Kafka 中消费数据,要消费下来的数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以消费的数据条数和大小,比较得出性能最优的。
代码示例(以 pykafka 为例):
- 1 from datetime import datetime
- 2 from pykafka import KafkaClient
- 3
- 4
- 5 class KafkaConsumerTool():
- 6 def __init__(self, broker, topic):
- 7 client = KafkaClient(hosts=broker)
- 8 self.topic = client.topics[topic]
- 9 self.consumer = self.topic.get_simple_consumer()
- 10
- 11 def receive_msg(self):
- 12 count = 0
- 13 print(datetime.now())
- 14 while True:
- 15 msg = self.consumer.consume()
- 16 if msg:
- 17 count += 1
- 18 if count == 500000:
- 19 print(datetime.now())
- 20 return
- 21
- 22
- 23 if __name__ == \'__main__\':
- 24 consumer = KafkaConsumerTool(broker, topic)
- 25 consumer.receive_msg()
- 26 consumer.consumer.stop()
4.测试结果
- Kafka Producer 测试结果:
总耗时/秒 | 每秒数据量/MB | 每秒数据条数 | |
confluent_kafka | 35 | 27.90 | 14285.71 |
pykafka | 50 | 19.53 | 10000 |
kafka-python | 532 | 1.83 | 939.85 |
- Kafka Consumer 测试结果:
总耗时/秒 | 每秒数据量/MB | 每秒数据条数 | |
confluent_kafka | 39 | 25.04 | 12820.51 |
kafka-python | 52 | 18.78 | 9615.38 |
pykafka | 335 | 2.92 | 1492.54 |
5.测试结论
经过测试,在此次测试的三个库中,生产消息的效率排名是:confluent-kafka > pykafka > kafka-python,消费消息的效率排名是:confluent-kafka > kafka-python > pykafka,由此可见 confluent-kafka 的性能是其中最优的,因而选用这个库进行后续开发。
三、多线程模型性能测试
1.编程模型
经过前面的测试已经知道 confluent-kafka 这个库的性能是很优秀的了,但如果还需要更高的效率,应该怎么办呢?当单线程(或者单进程)不能满足需求时,我们很容易想到使用多线程(或者多进程)来增加并发提高效率,考虑到线程的资源消耗比进程少,所以打算选用多线程来进行开发。那么多线程消费 Kafka 有什么实现方式呢?我想到的有两种:
- 一个线程实现一个 Kafka Consumer,最多可以有 n 个线程同时消费 Topic(其中 n 是该 Topic 下的分区数量);
- 多个线程共用一个 Kafka Consumer,此时也可以实例化多个 Consumer 同时消费。
对比这两种多线程模型:
- 模型1实现方便,可以保证每个分区有序消费,但 Partition 数量会限制消费能力;
- 模型2并发度高,可扩展能力强,消费能力不受 Partition 限制。
2.测试过程
(1)多线程模型1
测试代码:
- 1 import time
- 2 from threading import Thread
- 3 from datetime import datetime
- 4 from confluent_kafka import Consumer
- 5
- 6
- 7 class ChildThread(Thread):
- 8 def __init__(self, name, broker, topic):
- 9 Thread.__init__(self, name=name)
- 10 self.con = KafkaConsumerTool(broker, topic)
- 11
- 12 def run(self):
- 13 self.con.receive_msg()
- 14
- 15
- 16 class KafkaConsumerTool:
- 17 def __init__(self, broker, topic):
- 18 config = {
- 19 \'bootstrap.servers\': broker,
- 20 \'session.timeout.ms\': 30000,
- 21 \'auto.offset.reset\': \'earliest\',
- 22 \'api.version.request\': False,
- 23 \'broker.version.fallback\': \'2.6.0\',
- 24 \'group.id\': \'test\'
- 25 }
- 26 self.consumer = Consumer(config)
- 27 self.topic = topic
- 28
- 29 def receive_msg(self):
- 30 self.consumer.subscribe([self.topic])
- 31 print(datetime.now())
- 32 while True:
- 33 msg = self.consumer.poll(timeout=30.0)
- 34 print(msg)
- 35
- 36
- 37 if __name__ == \'__main__\':
- 38 thread_num = 10
- 39 threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in range(thread_num)]
- 40
- 41 for i in range(thread_num):
- 42 threads[i].setDaemon(True)
- 43 for i in range(thread_num):
- 44 threads[i].start()
因为我使用的 Topic 共有8个分区,所以我分别测试了线程数在5个、8个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。
(2)多线程模型2
测试代码:
- 1 import time
- 2 from datetime import datetime
- 3 from confluent_kafka import Consumer
- 4 from threadpool import ThreadPool, makeRequests
- 5
- 6
- 7 class KafkaConsumerTool:
- 8 def __init__(self, broker, topic):
- 9 config = {
- 10 \'bootstrap.servers\': broker,
- 11 \'session.timeout.ms\': 30000,
- 12 \'auto.offset.reset\': \'earliest\',
- 13 \'api.version.request\': False,
- 14 \'broker.version.fallback\': \'2.6.0\',
- 15 \'group.id\': \'mini-spider\'
- 16 }
- 17 self.consumer = Consumer(config)
- 18 self.topic = topic
- 19
- 20 def receive_msg(self, x):
- 21 self.consumer.subscribe([self.topic])
- 22 print(datetime.now())
- 23 while True:
- 24 msg = self.consumer.poll(timeout=30.0)
- 25 print(msg)
- 26
- 27
- 28 if __name__ == \'__main__\':
- 29 thread_num = 10
- 30 consumer = KafkaConsumerTool(broker, topic)
- 31 pool = ThreadPool(thread_num)
- 32 for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
- 33 pool.putRequest(r)
- 34 pool.wait()
主要使用 threadpool 这个第三方库来实现线程池,此处当然也可以使用其他库来实现,这里我分别测试了线程数量在5个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。
3.测试结果
- 多线程模型1
总数据量/万 | 线程数量 | 总耗时/秒 | 每秒数据条数 |
50 | 5 | 27 | 18518.51 |
50 | 8 | 24 | 20833.33 |
50 | 10 | 26 | 19230.76 |
- 多线程模型2
总数据量/万 | 线程数量 | 总耗时/秒 | 每秒数据条数 |
50 | 5 | 17 | 29411.76 |
50 | 10 | 13 | 38461.53 |
4.测试结论
使用多线程可以有效提高 Kafka 的 Consumer 消费数据的效率,而选用线程池共用一个 KafkaConsumer 的消费方式的消费效率更高。