1 测量函数运行时间
  1. import time
  2. def profile(func):
  3. def wrapper(*args, **kwargs):
  4. import time
  5. start = time.time()
  6. func(*args, **kwargs)
  7. end = time.time()
  8. print \'COST: {}\'.format(end - start)
  9. return wrapper
  10. @profile
  11. def fib(n):
  12. if n<= 2:
  13. return 1
  14. return fib(n-1) + fib(n-2)
  15. fib(35)

 


2 启动多个线程,并等待完成
 
2.1 使用threading.enumerate()
  1. import threading
  2. for i in range(2):
  3. t = threading.Thread(target=fib, args=(35,))
  4. t.start()
  5. main_thread = threading.currentThread()
  6. for t in threading.enumerate():
  7. if t is main_thread:
  8. continue
  9. t.join()

 

2.2 先保存启动的线程
  1. threads = []
  2. for i in range(5):
  3. t = Thread(target=foo, args=(i,))
  4. threads.append(t)
  5. t.start()
  6. for t in threads:
  7. t.join()
 

3 使用信号量,限制同时能有几个线程访问临界区
  1. from threading import Semaphore
  2. import time
  3. sema = Semaphore(3)
  4. def foo(tid):
  5. with sema:
  6. print(\'{} acquire sema\'.format(tid))
  7. wt = random() * 2
  8. time.sleep(wt)
  9. print(\'{} release sema\'.format(tid))

 


4 锁,相当于信号量为1的情况
  1. from threading import Thread Lock
  2. value = 0
  3. lock = Lock()
  4. def getlock():
  5. global lock
  6. with lock:
  7. new = value + 1
  8. time.sleep(0.001)
  9. value = new

 

 

5 可重入锁RLock
    acquire() 可以不被阻塞的被同一个线程调用多次,release()需要和acquire()调用次数匹配才能释放锁

6 条件 Condition
一个线程发出信号,另一个线程等待信号
常用于生产者-消费者模型
  1. import time
  2. import threading
  3. def consumer(cond):
  4. t = threading.currentThread()
  5. with cond:
  6. cond.wait()
  7. print("{}: Resource is available to sonsumer".format(t.name))
  8. def producer(cond):
  9. t = threading.currentThread()
  10. with cond:
  11. print("{}: Making resource available".format(t.name))
  12. cond.notifyAll()
  13. condition = threading.Condition()
  14. c1 = threading.Thread(name=\'c1\', target=consumer, args=(condition,))
  15. c2 = threading.Thread(name=\'c2\', target=consumer, args=(condition,))
  16. p = threading.Thread(name=\'p\', target=producer, args=(condition,))
  17. c1.start()
  18. c2.start()
  19. p.start()

 

 

7 事件 Event
感觉和Condition 差不多
  1. import time
  2. import threading
  3. from random import randint
  4. TIMEOUT = 2
  5. def consumer(event, l):
  6. t = threading.currentThread()
  7. while 1:
  8. event_is_set = event.wait(TIMEOUT)
  9. if event_is_set:
  10. try:
  11. integer = l.pop()
  12. print \'{} popped from list by {}\'.format(integer, t.name)
  13. event.clear() # 重置事件状态
  14. except IndexError: # 为了让刚启动时容错
  15. pass
  16. def producer(event, l):
  17. t = threading.currentThread()
  18. while 1:
  19. integer = randint(10, 100)
  20. l.append(integer)
  21. print \'{} appended to list by {}\'.format(integer, t.name)
  22. event.set() # 设置事件
  23. time.sleep(1)
  24. event = threading.Event()
  25. l = []
  26. threads = []
  27. for name in (\'consumer1\', \'consumer2\'):
  28. t = threading.Thread(name=name, target=consumer, args=(event, l))
  29. t.start()
  30. threads.append(t)
  31. p = threading.Thread(name=\'producer1\', target=producer, args=(event, l))
  32. p.start()
  33. threads.append(p)
  34. for t in threads:
  35. t.join()

 

 

8 线程队列 
线程队列有task_done() 和 join()
标准库里的例子
往队列内放结束标志,注意do_work阻塞可能无法结束,需要用超时
  1. import queue
  2. def worker():
  3. while True:
  4. item = q.get()
  5. if item is None:
  6. break
  7. do_work(item)
  8. q.task_done()
  9. q = queue.Queue()
  10. threads = []
  11. for i in range(num_worker_threads):
  12. t = threading.Thread(target=worker)
  13. t.start()
  14. threads.append(t)
  15. for item in source():
  16. q.put(item)
  17. q.join()
  18. for i in range(num_worker_threads):
  19. q.put(None)
  20. for t in threads:
  21. t.join()

 

 

9 优先级队列 PriorityQueue
  1. import threading
  2. from random import randint
  3. from queue import PriorityQueue
  4. q = PriorityQueue()
  5. def double(n):
  6. return n * 2
  7. def producer():
  8. count = 0
  9. while 1:
  10. if count > 5:
  11. break
  12. pri = randint(0, 100)
  13. print(\'put :{}\'.format(pri))
  14. q.put((pri, double, pri)) # (priority, func, args)
  15. count += 1
  16. def consumer():
  17. while 1:
  18. if q.empty():
  19. break
  20. pri, task, arg = q.get()
  21. print(\'[PRI:{}] {} * 2 = {}\'.format(pri, arg, task(arg)))
  22. q.task_done()
  23. time.sleep(0.1)
  24. t = threading.Thread(target=producer)
  25. t.start()
  26. time.sleep(1)
  27. t = threading.Thread(target=consumer)
  28. t.start()

 

 

10 线程池
当线程执行相同的任务时用线程池
10.1 multiprocessing.pool 中的线程池
  1. from multiprocessing.pool import ThreadPool
  2. pool = ThreadPool(5)
  3. pool.map(lambda x: x**2, range(5))

 

10.2 multiprocessing.dummy
  1. from multiprocessing.dummy import Pool

 

10.3 concurrent.futures.ThreadPoolExecutor
  1. from concurrent.futures improt ThreadPoolExecutor
  2. from concurrent.futures import as_completed
  3. import urllib.request
  4. URLS = [\'http://www.baidu.com\', \'http://www.hao123.com\']
  5. def load_url(url, timeout):
  6. with urllib.request.urlopen(url, timeout=timeout) as conn:
  7. return conn.read()
  8. with ThreadPoolExecutor(max_workers=5) as executor:
  9. future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
  10. for future in as_completed(future_to_url):
  11. url = future_to_url[future]
  12. try:
  13. data = future.result()
  14. execpt Exception as exc:
  15. print("%r generated an exception: %s" % (url, exc))
  16. else:
  17. print("%r page is %d bytes" % (url, len(data)))

 


11 启动多进程,等待多个进程结束
  1. import multiprocessing
  2. jobs = []
  3. for i in range(2):
  4. p = multiprocessing.Process(target=fib, args=(12,))
  5. p.start()
  6. jobs.append(p)
  7. for p in jobs:
  8. p.join()

 


12 进程池
12.1 multiprocessing.Pool
  1. from multiprocessing import Pool
  2. pool = Pool(2)
  3. pool.map(fib, [36] * 2)

 

 
12.2 concurrent.futures.ProcessPoolExecutor
  1. from concurrent.futures import ProcessPoolExecutor
  2. import math
  3. PRIMES = [ 112272535095293, 112582705942171]
  4. def is_prime(n):
  5. if n < 2:
  6. return False
  7. if n == 2:
  8. return True
  9. if n % 2 == 0:
  10. return False
  11. sqrt_n = int(math.floor(math.sqrt(n)))
  12. for i in range(3, sqrt_n + 1, 2):
  13. if n % i == 0:
  14. return False
  15. return True
  16. if __name__ == "__main__":
  17. with ProcessPoolExecutor() as executor:
  18. for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
  19. print("%d is prime: %s" % (number, prime))

 


 
13 asyncio
 
13.1 最基本的示例,单个任务
  1. import asyncio
  2. async def hello():
  3. print("Hello world!")
  4. await asyncio.sleep(1)
  5. print("Hello again")
  6. loop = asyncio.get_event_loop()
  7. loop.run_until_complete(hello())
  8. loop.close()

 

13.2 最基本的示例,多个任务
  1. import asyncio
  2. async def hello():
  3. print("Hello world!")
  4. await asyncio.sleep(1)
  5. print("Hello again")
  6. loop = asyncio.get_event_loop()
  7. tasks = [hello(), hello()]
  8. loop.run_until_complete(asyncio.wait(tasks))
  9. loop.close()

 

 
13.3 结合httpx 执行多个任务并接收返回结果
httpx 接口和 requests基本一致
  1. import asyncio
  2. import httpx
  3. async def get_url():
  4. r = await httpx.get("http://www.baidu.com")
  5. return r.status_code
  6. loop = asyncio.get_event_loop()
  7. tasks = [get_url() for i in range(10)]
  8. results = loop.run_until_complete(asyncio.gather(*tasks))
  9. loop.close()
  10. for num, result in zip(range(10), results):
  11. print(num, result)

 

 
 

版权声明:本文为junmoxiao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/junmoxiao/p/11948993.html