Python多线程与队列
Python多线程与Queue队列多线程在感官上类似于同时执行多个程序,虽然由于GIL的存在,在Python中无法实现线程的真正并行,但是对于某些场景,多线程仍不失为一个有效的处理方法:
1,不紧急的,无需阻塞主线程的任务,此时可以利用多线程在后台慢慢处理;
2,IO密集型操作,比如文件读写、用户输入和网络请求等,此时多线程可以近似达到甚至优于多进程的表现;
多线程的基本使用不再赘述,以下语法便可轻松实现:
1 def task(args1, args2): 2 pass 3 4 Thread( 5 target=task, 6 args=(args1, args2) 7 ).start()
这里我们重点关注线程通信。
假设有这么一种场景:有一批源数据,指定一个操作系数N,需要分别对其进行与N的加减乘除操作,并将结果汇总。
当然这里的加减乘除只是一种简单处理,在实际的生产环境中,它其实代表了一步较为复杂的业务操作,并包含了较多的IO处理。
自然我们想到可以开启多线程处理,那么紧接着的问题便是:如何划分线程,是根据处理步骤划分,还是根据源数据划分?
对于前者,我们把涉及的业务操作单独划分位一个线程,即有4个线程分别进行加减乘除的操作,显然上一个线程的结果是下一个线程的输入,这类似于流水线操作;
而后者则是把源数据分为若干份,每份启动一个线程进行处理,最终把结果汇总。一般来说,我们推荐第一种方式。因为在一个线程中完成所有的操作不如每步一个线程清晰明了,
尤其是在一些复杂的场景下,会加大单个线程的出错概率和测试难度。
那么我们将开辟4个线程,分别执行加减乘除操作。最后一个除法线程结束则任务完成:
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 from Queue import Queue 5 from threading import Thread 6 7 8 class NumberHandler(object): 9 def __init__(self, n): 10 self.n = n 11 12 def add(self, num): 13 return num + self.n 14 15 def subtract(self, num): 16 return num - self.n 17 18 def multiply(self, num): 19 return num * self.n * self.n 20 21 def divide(self, num): 22 return num / self.n 23 24 25 class ClosableQueue(Queue): 26 SENTINEL = object() 27 28 def close(self): 29 self.put(self.SENTINEL) 30 31 def __iter__(self): 32 while True: 33 item = self.get() 34 try: 35 if item is self.SENTINEL: 36 return 37 yield item 38 finally: 39 self.task_done() 40 41 42 class StoppableWorker(Thread): 43 def __init__(self, func, in_queue, out_queue): 44 super(StoppableWorker, self).__init__() 45 self.in_queue = in_queue 46 self.out_queue = out_queue 47 self.func = func 48 49 def run(self): 50 for item in self.in_queue: 51 result = self.func(item) 52 self.out_queue.put(result) 53 print self.func 54 55 56 if __name__ == '__main__': 57 source_queue = ClosableQueue() 58 add_queue = ClosableQueue() 59 subtract_queue = ClosableQueue() 60 multiply_queue = ClosableQueue() 61 divide_queue = ClosableQueue() 62 result_queue = ClosableQueue() 63 64 number_handler = NumberHandler(5) 65 66 threads = [ 67 StoppableWorker(number_handler.add, add_queue, subtract_queue), 68 StoppableWorker(number_handler.subtract, subtract_queue, multiply_queue), 69 StoppableWorker(number_handler.multiply, multiply_queue, divide_queue), 70 StoppableWorker(number_handler.divide, divide_queue, result_queue), 71 ] 72 73 for _thread in threads: 74 _thread.start() 75 76 for i in range(10): 77 add_queue.put(i) 78 79 add_queue.close() 80 add_queue.join() 81 print 'add job done...' 82 subtract_queue.close() 83 subtract_queue.join() 84 print 'subtract job done...' 85 multiply_queue.close() 86 multiply_queue.join() 87 print 'multiply job done...' 88 divide_queue.close() 89 divide_queue.join() 90 print 'divide job done...' 91 result_queue.close() 92 93 print "%s items finished, result: %s" % (result_queue.qsize(), result_queue) 94 95 for i in result_queue: 96 print i
运行结果:
线程执行日志:
总的结果:
可见线程交叉运行,但是任务却是顺序结束,这符合我们的预期。
值得注意的是,我们在ClosableQueue定义了一个close()方法,通过放入一个特殊的类变量SENTINEL告诉队列应该关闭。此外,由于直接加减乘除结果不变,因此我特意乘了两次来便于我们判断结果。
总结:
1. Queue是一种高效的任务处理方式,它可以把任务处理流程划分为若干阶段,并使用多条python线程来同时执行这些子任务;
2. Queue类具备阻塞式的队列操作、能够指定缓冲区尺寸,而且还支 持join方法,这使得开发者可以构建出健壮的流水线。