Python 并发总结,多线程,多进程,异步IO
- import time
- def profile(func):
- def wrapper(*args, **kwargs):
- import time
- start = time.time()
- func(*args, **kwargs)
- end = time.time()
- print \'COST: {}\'.format(end - start)
- return wrapper
- @profile
- def fib(n):
- if n<= 2:
- return 1
- return fib(n-1) + fib(n-2)
- fib(35)
- import threading
- for i in range(2):
- t = threading.Thread(target=fib, args=(35,))
- t.start()
- main_thread = threading.currentThread()
- for t in threading.enumerate():
- if t is main_thread:
- continue
- t.join()
- threads = []
- for i in range(5):
- t = Thread(target=foo, args=(i,))
- threads.append(t)
- t.start()
- for t in threads:
- t.join()
- from threading import Semaphore
- import time
- sema = Semaphore(3)
- def foo(tid):
- with sema:
- print(\'{} acquire sema\'.format(tid))
- wt = random() * 2
- time.sleep(wt)
- print(\'{} release sema\'.format(tid))
- from threading import Thread Lock
- value = 0
- lock = Lock()
- def getlock():
- global lock
- with lock:
- new = value + 1
- time.sleep(0.001)
- value = new
- import time
- import threading
- def consumer(cond):
- t = threading.currentThread()
- with cond:
- cond.wait()
- print("{}: Resource is available to sonsumer".format(t.name))
- def producer(cond):
- t = threading.currentThread()
- with cond:
- print("{}: Making resource available".format(t.name))
- cond.notifyAll()
- condition = threading.Condition()
- c1 = threading.Thread(name=\'c1\', target=consumer, args=(condition,))
- c2 = threading.Thread(name=\'c2\', target=consumer, args=(condition,))
- p = threading.Thread(name=\'p\', target=producer, args=(condition,))
- c1.start()
- c2.start()
- p.start()
- import time
- import threading
- from random import randint
- TIMEOUT = 2
- def consumer(event, l):
- t = threading.currentThread()
- while 1:
- event_is_set = event.wait(TIMEOUT)
- if event_is_set:
- try:
- integer = l.pop()
- print \'{} popped from list by {}\'.format(integer, t.name)
- event.clear() # 重置事件状态
- except IndexError: # 为了让刚启动时容错
- pass
- def producer(event, l):
- t = threading.currentThread()
- while 1:
- integer = randint(10, 100)
- l.append(integer)
- print \'{} appended to list by {}\'.format(integer, t.name)
- event.set() # 设置事件
- time.sleep(1)
- event = threading.Event()
- l = []
- threads = []
- for name in (\'consumer1\', \'consumer2\'):
- t = threading.Thread(name=name, target=consumer, args=(event, l))
- t.start()
- threads.append(t)
- p = threading.Thread(name=\'producer1\', target=producer, args=(event, l))
- p.start()
- threads.append(p)
- for t in threads:
- t.join()
- import queue
- def worker():
- while True:
- item = q.get()
- if item is None:
- break
- do_work(item)
- q.task_done()
- q = queue.Queue()
- threads = []
- for i in range(num_worker_threads):
- t = threading.Thread(target=worker)
- t.start()
- threads.append(t)
- for item in source():
- q.put(item)
- q.join()
- for i in range(num_worker_threads):
- q.put(None)
- for t in threads:
- t.join()
- import threading
- from random import randint
- from queue import PriorityQueue
- q = PriorityQueue()
- def double(n):
- return n * 2
- def producer():
- count = 0
- while 1:
- if count > 5:
- break
- pri = randint(0, 100)
- print(\'put :{}\'.format(pri))
- q.put((pri, double, pri)) # (priority, func, args)
- count += 1
- def consumer():
- while 1:
- if q.empty():
- break
- pri, task, arg = q.get()
- print(\'[PRI:{}] {} * 2 = {}\'.format(pri, arg, task(arg)))
- q.task_done()
- time.sleep(0.1)
- t = threading.Thread(target=producer)
- t.start()
- time.sleep(1)
- t = threading.Thread(target=consumer)
- t.start()
- from multiprocessing.pool import ThreadPool
- pool = ThreadPool(5)
- pool.map(lambda x: x**2, range(5))
- from multiprocessing.dummy import Pool
- from concurrent.futures improt ThreadPoolExecutor
- from concurrent.futures import as_completed
- import urllib.request
- URLS = [\'http://www.baidu.com\', \'http://www.hao123.com\']
- def load_url(url, timeout):
- with urllib.request.urlopen(url, timeout=timeout) as conn:
- return conn.read()
- with ThreadPoolExecutor(max_workers=5) as executor:
- future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
- for future in as_completed(future_to_url):
- url = future_to_url[future]
- try:
- data = future.result()
- execpt Exception as exc:
- print("%r generated an exception: %s" % (url, exc))
- else:
- print("%r page is %d bytes" % (url, len(data)))
- import multiprocessing
- jobs = []
- for i in range(2):
- p = multiprocessing.Process(target=fib, args=(12,))
- p.start()
- jobs.append(p)
- for p in jobs:
- p.join()
- from multiprocessing import Pool
- pool = Pool(2)
- pool.map(fib, [36] * 2)
- from concurrent.futures import ProcessPoolExecutor
- import math
- PRIMES = [ 112272535095293, 112582705942171]
- def is_prime(n):
- if n < 2:
- return False
- if n == 2:
- return True
- if n % 2 == 0:
- return False
- sqrt_n = int(math.floor(math.sqrt(n)))
- for i in range(3, sqrt_n + 1, 2):
- if n % i == 0:
- return False
- return True
- if __name__ == "__main__":
- with ProcessPoolExecutor() as executor:
- for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
- print("%d is prime: %s" % (number, prime))
- import asyncio
- async def hello():
- print("Hello world!")
- await asyncio.sleep(1)
- print("Hello again")
- loop = asyncio.get_event_loop()
- loop.run_until_complete(hello())
- loop.close()
- import asyncio
- async def hello():
- print("Hello world!")
- await asyncio.sleep(1)
- print("Hello again")
- loop = asyncio.get_event_loop()
- tasks = [hello(), hello()]
- loop.run_until_complete(asyncio.wait(tasks))
- loop.close()
- import asyncio
- import httpx
- async def get_url():
- r = await httpx.get("http://www.baidu.com")
- return r.status_code
- loop = asyncio.get_event_loop()
- tasks = [get_url() for i in range(10)]
- results = loop.run_until_complete(asyncio.gather(*tasks))
- loop.close()
- for num, result in zip(range(10), results):
- print(num, result)