python中多线程,多进程,多协程概念及编程上的应用
1, 多线程
- 线程是进程的一个实体,是CPU进行调度的最小单位,他是比进程更小能独立运行的基本单位。
- 线程基本不拥有系统资源,只占用一点运行中的资源(如程序计数器,一组寄存器和栈),但是它可以与同属于一个进程的其他线程共享全部的资源。
- 提高程序的运行速率,上下文切换快,开销比较少,但是不够稳定,容易丢失数据,形成死锁。
直接上代码:
import time import threading # 函数1用时2秒 def fun1(): time.sleep(2) print(threading.current_thread().name, time.ctime()) # 函数2用时4秒 def fun2(): time.sleep(4) print(threading.current_thread().name, time.ctime()) # 函数3用时6秒 def fun3(): time.sleep(6) print(\'hello python\', time.ctime()) th1 = threading.Thread(target=fun1) th2 = threading.Thread(target=fun2) th3 = threading.Thread(target=fun3) th1.start() th2.start() th3.start()
打印结果:
Thread-1 Mon Jan 7 11:01:52 2019 Thread-2 Mon Jan 7 11:01:54 2019 hello python Mon Jan 7 11:01:56 2019
解析:从结果看出,他们同一时间 11:01:50开始执行,分别用了不同的时间结束
接着往下看,添加join阻塞线程
\'\'\'\'\'\'
th1.start()
th1.join()
th2.start()
th2.join()
th3.start()
th3.join()
打印结果:
Thread-1 Mon Jan 7 11:19:00 2019
Thread-2 Mon Jan 7 11:19:04 2019
hello python Mon Jan 7 11:19:10 2019
我们看到这三线程按顺序依次执行。
我们接着看看线程的方法使用:
threading.enumerate() #列举线程,返回列表,其中里面会有一条主线程 threading.activeCount() #查看线程运行个数 threading.current_thread().name #查看当前运行线程名称 join() #阻塞线程运行
我们接着看第二种开线程的方式:
import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): time.sleep(1) msg = "I\'m "+self.name+\' @ \'+str(i) #name属性中保存的是当前线程的名字 print(msg) if __name__ == \'__main__\': t = MyThread() t.setName(\'yangzhenyu\') a = t.isAlive() print(a) print(t.getName()) t.start() b = t.isAlive() print(b)
打印结果:
False yanzghenyu True I\'m yanzghenyu @ 0 I\'m yanzghenyu @ 1 I\'m yanzghenyu @ 2
方法总结:
t.setName() #设置运行线程名称,不指定默认Thread-1 t.getName() #获取线程名称 t.isAlive() #判断线程是否运行,返回布尔类型
线程间共享全局变量:
import threading import time n = 100 def work01(): global n for i in range(3): n += 1 print(n) //103 def work02(): global n print(n) //103 print(n) //100 t1 = threading.Thread(target=work01) t1.start()
time.sleep(1)
t2 = threading.Thread(target=work02) t2.start()
关于线程锁
- 用threading.Lock()创建锁,用acquire()申请锁,每次只有一个线程获得锁,其他线程必须等此线程release()后才能获得锁
- RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。
- 注意:如果使用RLock,那么acquire和release必须成对出现,即同一线程中调用了n次acquire,必须调用n次的release才能真正释放所占用的琐
下面例子中我们用到的是Lock(),当加完锁之后,该方法同一时间内只能被一个线程调用。
import threading mylock=threading.Lock()#创建锁 num = 0 def add_num(name): global num while True: mylock.acquire()#申请锁也就是加锁 print(\'thread %s locked! num=%d\'%(name,num)) if num>=5: print(\'thread %s release! num=%d\'%(name,num)) mylock.release()#释放锁 return num += 1 print(\'thread %s release! num = %d\'%(name,num)) mylock.release() t1 = threading.Thread(target=add_num,args=(\'A\',)) t2 = threading.Thread(target=add_num,args=(\'B\',)) t1.start() t2.start()
打印结果:
thread A locked! num=0 thread A release! num = 1 thread A locked! num=1 thread A release! num = 2 thread A locked! num=2 thread A release! num = 3 thread A locked! num=3 thread A release! num = 4 thread A locked! num=4 thread A release! num = 5 thread A locked! num=5 thread A release! num=5 thread B locked! num=5 thread B release! num=5
cpu io密集型适合用多线程进行开发
关于进程:
- 进程是系统进行资源分配的最小单位,每个进程都有自己的独立内存空间,不用进程通过进程间通信来通信。
- 但是进程占据独立空间,比较重量级,所以上下文进程间的切换开销比较大,但是比较稳定安全。
进程创建:
第一种创建进程的方式:
from multiprocessing import Process import time import random import os def piao(name): print("%s is piaoping"%name) time.sleep(random.randint(0,1)) print("%s is piao end"%name) if __name__ == \'__main__\': print("CPU的个数是:%d"%os.cpu_count()) p1 = Process(target=piao,args=("alex",),name="进程1") print(p1.name) p1.start() print("父进程!") #执行速度要远快于建立新进程的时间
打印结果:
CPU的个数是:2 进程1 父进程! alex is piaoping alex is piao end
第二种创建进程的方式:
from multiprocessing import Process import time import random #继承Process类,并实现自己的run方法 class Piao(Process): def __init__(self,name): #必须调用父类的init方法 super().__init__() self.name = name def run(self): print("%s is piaoing"%self.name) time.sleep(random.randint(1,3)) print("%s is piaoeng"%self.name) if __name__ == \'__main__\': p1 = Piao("Alex") #开辟一个新的进程实际上就是执行本进程所对应的run()方法 p1.start() print("主进程!")
结果:
主进程! Alex is piaoing Alex is piaoeng
解析:join括号中不携带参数,表示父进程在这个位置要等待p1进程执行完成后,如果指定参数,也就是等待时间s,那么主进程将在这个时间内结束,
用is_active() 方法即可检测进程的状态,不加join() 返回True,表示进程还在进行。
进程的方法,
start() 启动进程实例(创建子进程); terminate():不管任务是否完成,立即终止; name: 当前进程实例别名,默认为Process-N,N为从1开始递增的整数; pid: 当前进程实例的PID值; os.getpid() is_alive(): 判断进程实例是否还在执行; join([timeout]):是否等待进程实例执行结束,或等待多少秒;
进程池:
在程序实际处理问题时,忙时会有成千上万个任务需要执行,闲时有零星任务,创建时需要消耗时间,销毁也需要时间,
即使开启成千上万个进程,操作系统也不能 让他同时执行。这里就用到了进程池,用于管理小块内存的申请与释放。
,
1,上代码:
from multiprocessing.pool import Pool from time import sleep def fun(a): sleep(1) print(a) if __name__ == \'__main__\': p = Pool() # 这里不加参数,但是进程池的默认大小,等于电脑CPU的核数 # 也是创建子进程的个数,也是每次打印的数字的个数 for i in range(10): p.apply_async(fun, args=(i,))
p.close() p.join() # 等待所有子进程结束,再往后执行 print("end")
2,callback 举例:
from multiprocessing import Process,Pool def func(i): i+=1 return i#普通进程处理过的数据返回给主进程p1 def call_back(p1): p1+=1 print(p1) if __name__ == \'__main__\': p = Pool() for i in range(10): p1 = p.apply_async(func,args=(i,),callback = call_back)#p调用普通进程并且接受其返回值,将返回值给要执行的回调函数处理 p.close() p.join()
解析: 1,p.apply ( func,args = ()) 同步的效率,也就是说池中的进程一个一个的去执行任务
p.apply_async( func,args = () , callback = None) : 异步的效率,也就是池中的进程一次性都去执行任务.
2,异步处理任务时 : 必须要加上close和join. 进程池的所有进程都是守护进程(主进程代码执行结束,守护进程就结束).
3,func : 进程池中的进程执行的任务函数
4,args : 可迭代对象性的参数,是传给任务函数的参数
5,callback : 回调函数,就是每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,
由回调函数进行进一步处理,回调函数只异步才有,同步没有.回调函数是父进程调用.
3. map( func,iterable) (该方法经常用到爬虫)
from multiprocessing import Pool def func(num): num += 1 print(num) return num if __name__ == \'__main__\': p = Pool(2) res = p.map(func,[i for i in range(100)]) # p.close()#map方法自带这两种功能 # p.join() print(\'主进程中map的返回值\',res)
func : 进程池中的进程执行的任务函数
iterable : 可迭代对象,是把可迭代对象那个中的每个元素一次传给任务函数当参数.
map方法自带close和join
进程间的通信:
1)队列
from multiprocessing import Queue,Process import os,time,random #添加数据函数 def proc_write(queue,urls): print("进程(%s)正在写入..."%(os.getpid())) for url in urls: queue.put(url) print("%s被写入到队列中"%(url)) time.sleep(random.random()*3) #读取数据函数 def proc_read(queue): print("进程(%s)正在读取..."%(os.getpid())) while True: url = queue.get() print("从队列中提取到:%s"%(url)) if __name__ =="__main__":
queue = Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()
生产者与消费者模式(线程间的通信):
from queue import Queue import threading,time class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count +1 msg = \'生成产品\'+str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + \'消费了 \'+queue.get() print(msg) time.sleep(1) if __name__ == \'__main__\': queue = Queue() for i in range(500): queue.put(\'初始产品\'+str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
2) 进程间的通信(管道)
from multiprocessing import Pipe,Process import random,time,os def proc_send(pipe,urls): for url in urls: print("进程(%s)发送:%s"%(os.getpid(),url)) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print("进程(%s)接收到:%s"%(os.getpid(),pipe.recv())) time.sleep(random.random()) if __name__ == "__main__": pipe = Pipe() p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],)) p2 = Process(target=proc_recv,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.terminate()
解析:
pipe用于两个进程间的通信,两个进程分别位于管道的两端,Pipe方法返回(conn1,conn2)代表一个管道的两端,
Pipe方法有dumplex参数,若该参数为True,管道为全双工模式,
若为Fasle,conn1只负责接收消息,conn2只负责发送消息.send和recv方法分别是发送和接收消息的方法
协程:
协程:是更小的执行单位,是一种轻量级的线程,协程的切换只是单纯的操作CPU的上下文,所以切换速度特别快,且耗能小。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
from gevent import monkey monkey.patch_all() # 用来在运行时动态修改已有的代码,而不需要修改原始代码。 import gevent import requests def f(url): print(\'GET: %s\' % url) html = requests.get(url).text print(url, len(html)) gevent.joinall([ gevent.spawn(f, \'http://i.maxthon.cn/\'), # 先执行这个函数,发送请求,等待的时候发送第二个请求 gevent.spawn(f, \'http://www.jianshu.com/u/3cfeb3395a95\'), gevent.spawn(f, \'http://edu.51cto.com/?jydh\')])
运行结果:
GET: http://i.maxthon.cn/ GET: http://www.jianshu.com/u/3cfeb3395a95 GET: http://edu.51cto.com/?jydh http://i.maxthon.cn/ 461786 http://edu.51cto.com/?jydh 353858 http://www.jianshu.com/u/3cfeb3395a95 597
从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。
使用gevent,可以获得极高的并发性能,但gevent只能在Unix/Linux下运行,在Windows下不保证正常安装和运行。