本章讨论python3.2引入的concurrent.futures模块。future是中文名叫期物。期物是一种对象,表示异步执行的操作

在很多任务中,特别是处理网络I/O。需要使用并发,因为网络有很高的延迟。所以为了不浪费CPU周期去等待,最好在收到网络响应之前做些其他的事。

首先来看下并发和非并发的两个脚本,来对比下各自的运行效率。在这个程序中,我们通过脚本去网站下载各个国家的国旗。网址是http://flupy.org/data/flags/cn/cn.gif

这里http://flupy.org/是基本URL,后面接/data/flags/国家名称/国家名称.gif

首先来看下顺序下载的代码:

country=(\’CN IN US ID BR PK NG BD JP MX PH VN ET EG DE IR TR CD FR\’).split()

 

BASE_URL=\’http://flupy.org/data/flags\’

 

DEST_URL=\’downloads/\’

 

def save_flag(img,filename):

    path=os.path.join(DEST_URL,filename)

    with open(path,\’wb\’) as f:

        f.write(img)

 

def get_flag(cc):

    url=\'{}/{cc}/{cc}.gif\’.format(BASE_URL,cc=cc.lower())

    resp=requests.get(url)

    return resp.content

 

def show(text):

    print(text)

    sys.stdout.flush()

 

def download_many(cc_list):

    for cc in sorted(cc_list):

        img=get_flag(cc)

        show(cc)

        save_flag(img,cc.lower()+\’.gif\’)

    return len(cc_list)

 

def main(download_many):

    t1=time.time()

    count=download_many(country)

    elapsed=time.time()-t1

    msg=\’\n{} flags downloaded in {:.2f}s\’

    print(msg.format(count,elapsed))

 

 

 

if __name__==”__main__”:

    main(download_many)

 

代码中通过requests对图片进行下载并且保存到downloads文件夹下面。并且在main中统计代码运行的时间。运行结果如下:

/usr/bin/python3.6 /home/zhf/py_prj/function_test/test.py

BD

BR

CD

CN

DE

EG

ET

FR

ID

IN

IR

JP

MX

NG

PH

PK

TR

US

VN

19 flags downloaded in 15.29s

下载了19个图片总共花费15.29秒。接下来我们用concurrent.futures模块来对代码进行改造。在这里添加download_onedownload_many_futures两个函数

在ThreadPoolExecutor中设置最大运行的线程max_workers为3

executor.submit中传入单个的回调函数和参数

future.result()返回的是每个线程运行完后的结果,在这里就是download_one的返回值

futures.as_completed是一个迭代器,在期物运行结束后产出期物

def download_one(cc):

    image=get_flag(cc)

    show(cc)

    save_flag(image,cc.lower()+\’.gif\’)

    return cc

 

def download_many_futures(cc_list):

    with futures.ThreadPoolExecutor(max_workers=3) as executor:

        to_do=[]

        for cc in sorted(cc_list):

            future=executor.submit(download_one,cc)

            to_do.append(future)

            msg=\’Scheduled for {}:{}\’

            print(msg.format(cc,future))

 

        results=[]

        for future in futures.as_completed(to_do):

            res=future.result()

            msg=\'{} result:{!r}\’

            print(msg.format(future,res))

            results.append(res)

return len(results)

来看下运行的结果:总共耗时6.72秒,比之前的顺序下载节省了一半的时间

Scheduled for BD:<Future at 0x7f51f9c7e908 state=running>

Scheduled for BR:<Future at 0x7f51f9c7ef60 state=running>

Scheduled for CD:<Future at 0x7f51f8a20518 state=running>

Scheduled for CN:<Future at 0x7f51f8a20ef0 state=pending>

Scheduled for DE:<Future at 0x7f51f8a20f60 state=pending>

Scheduled for EG:<Future at 0x7f51f8a2c048 state=pending>

Scheduled for ET:<Future at 0x7f51f8a2c0f0 state=pending>

Scheduled for FR:<Future at 0x7f51f8a2c198 state=pending>

Scheduled for ID:<Future at 0x7f51f8a2c278 state=pending>

Scheduled for IN:<Future at 0x7f51f8a2c358 state=pending>

Scheduled for IR:<Future at 0x7f51f8a2c438 state=pending>

Scheduled for JP:<Future at 0x7f51f8a2c518 state=pending>

Scheduled for MX:<Future at 0x7f51f8a2c5f8 state=pending>

Scheduled for NG:<Future at 0x7f51f8a2c6d8 state=pending>

Scheduled for PH:<Future at 0x7f51f8a2c7b8 state=pending>

Scheduled for PK:<Future at 0x7f51f8a2c898 state=pending>

Scheduled for TR:<Future at 0x7f51f8a2c978 state=pending>

Scheduled for US:<Future at 0x7f51f8a2ca58 state=pending>

Scheduled for VN:<Future at 0x7f51f8a2cb38 state=pending>

BD

<Future at 0x7f51f9c7e908 state=finished returned str> result:\’BD\’

BR

<Future at 0x7f51f9c7ef60 state=finished returned str> result:\’BR\’

CD

<Future at 0x7f51f8a20518 state=finished returned str> result:\’CD\’

CN

<Future at 0x7f51f8a20ef0 state=finished returned str> result:\’CN\’

DE

<Future at 0x7f51f8a20f60 state=finished returned str> result:\’DE\’

EG

<Future at 0x7f51f8a2c048 state=finished returned str> result:\’EG\’

FR

<Future at 0x7f51f8a2c198 state=finished returned str> result:\’FR\’

ID

ET

<Future at 0x7f51f8a2c0f0 state=finished returned str> result:\’ET\’

<Future at 0x7f51f8a2c278 state=finished returned str> result:\’ID\’

IN

<Future at 0x7f51f8a2c358 state=finished returned str> result:\’IN\’

JP

<Future at 0x7f51f8a2c518 state=finished returned str> result:\’JP\’

IR

<Future at 0x7f51f8a2c438 state=finished returned str> result:\’IR\’

NG

<Future at 0x7f51f8a2c6d8 state=finished returned str> result:\’NG\’

MX

<Future at 0x7f51f8a2c5f8 state=finished returned str> result:\’MX\’

PK

<Future at 0x7f51f8a2c898 state=finished returned str> result:\’PK\’

PH

<Future at 0x7f51f8a2c7b8 state=finished returned str> result:\’PH\’

TR

<Future at 0x7f51f8a2c978 state=finished returned str> result:\’TR\’

US

<Future at 0x7f51f8a2ca58 state=finished returned str> result:\’US\’

VN

<Future at 0x7f51f8a2cb38 state=finished returned str> result:\’VN\’

 

19 flags downloaded in 6.72s。如果我们将max_workers设置为更大的值,比如设置为10, 得到的运行时间将会更快。通过运行的结果最快的时候达到1.9秒。

 

我们都知道python有全局解释器锁GIL,一次只允许使用一个线程执行python字节码。因此一个Python进程通过不能同时使用多个CPU核。关于GIL的解释可以参考http://cenalulu.github.io/python/gil-in-python/这篇帖子。

那么如果我们有多个CPU(os.cpu_count可以查看有多少个CPU),我们该怎么利用呢。这里就要用到futures.ProcessPoolExecutorProcessPoolExecutor将工作分配给多个python进程处理。因此如果需要做CPU密集型处理,使用这个模块能够绕开GIL,利用所有可用的CPU核。我们将代码修改为ProcessPoolExecutor来执行看下结果

19 flags downloaded in 4.92s。这个比设置

futures.ThreadPoolExecutor(max_workers=10)的时候还要慢一些。这是由于我的电脑只有4CPU核,因此限制只能4个并发下载。但是线程版本使用的是10个线程。在用线程运行的时候,所有阻塞型的I/O函数都会释放GIL,允许其他线程运行。time.sleep()也会释放GIL,因此尽管有GILpython的线程还是能发挥作用。

在一章中我们将介绍asynchio包处理并发

 

版权声明:本文为zhanghongfeng原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zhanghongfeng/p/8570630.html