python 线程池
python 的线程池模式是手动实现的,并没有现成的实现,因此我猜,线程池并不能够直接使代码清晰起来,更多的是概念性的,因为我觉得直接创建线程更加的简单粗暴,并且很好的定位错误信息。但是,也应该去了解它的实现原理。
import Queue, threading, sys from threading import Thread import time import urllib # working thread class Worker(Thread): worker_count = 0 timeout = 1 def __init__( self, workQueue, resultQueue, **kwds): Thread.__init__( self, **kwds ) self.id = Worker.worker_count Worker.worker_count += 1 self.setDaemon( True ) self.workQueue = workQueue self.resultQueue = resultQueue self.start( ) def run( self ): \'\'\' the get-some-work, do-some-work main loop of worker threads \'\'\' while True: try: callable, args, kwds = self.workQueue.get(timeout=Worker.timeout) res = callable(*args, **kwds) print "worker[%2d]: %s" % (self.id, str(res) ) self.resultQueue.put( res ) #time.sleep(Worker.sleep) except Queue.Empty: break except : print \'worker[%2d]\' % self.id, sys.exc_info()[:2] raise class WorkerManager: def __init__( self, num_of_workers=10, timeout = 2): self.workQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.workers = [] self.timeout = timeout self._recruitThreads( num_of_workers ) def _recruitThreads( self, num_of_workers ): for i in range( num_of_workers ): worker = Worker( self.workQueue, self.resultQueue ) self.workers.append(worker) def wait_for_complete( self): # ...then, wait for each of them to terminate: while len(self.workers): worker = self.workers.pop() worker.join( ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append( worker ) print "All jobs are are completed." def add_job( self, callable, *args, **kwds ): self.workQueue.put( (callable, args, kwds) ) def get_result( self, *args, **kwds ): return self.resultQueue.get( *args, **kwds )
以上是线程管理的创建方法。下面是调用的实例:
import urllib2 import time import socket from datetime import datetime from thread_pool import * def main(): url_list = {"sina":"http://www.sina.com.cn", "sohu":"http://www.sohu.com", "yahoo":"http://www.yahoo.com", "xiaonei":"http://www.xiaonei.com", "qihoo":"http://www.qihoo.com", "laohan":"http://www.laohan.org", "eyou":"http://www.eyou.com", "chinaren":"http://www.chinaren.com", "douban":"http://www.douban.com", "163":"http://www.163.com", "daqi":"http://www.daqi.com", "qq":"http://www.qq.com", "baidu_1":"http://www.baidu.com/s?wd=asdfasdf", "baidu_2":"http://www.baidu.com/s?wd=dddddddf", "google_1":"http://www.baidu.com/s?wd=sadfas", "google_2":"http://www.baidu.com/s?wd=sadflasd", "hainei":"http://www.hainei.com", "microsoft":"http://www.microsoft.com", "wlzuojia":"http://www.wlzuojia.com"} socket.setdefaulttimeout(10) print \'start testing\' wm = WorkerManager(50) for url_name in url_list.keys(): wm.add_job(do_get_con, url_name, url_list[url_name]) wm.wait_for_complete() print \'end testing\' def do_get_con(url_name,url_link): try: fd = urllib2.urlopen(url_link) data = fd.read() f_hand = open("/tmp/ttt/%s" % url_name,"w") f_hand.write(data) f_hand.close() except Exception,e: pass if __name__ == "__main__": main()