1. 多线程编程与线程安全相关重要概念

在我的上篇博文 聊聊Python中的GIL 中,我们熟悉了几个特别重要的概念:GIL,线程,进程, 线程安全,原子操作

以下是简单回顾,详细介绍请直接看聊聊Python中的GIL 

  • GIL:  Global Interpreter Lock,全局解释器锁。为了解决多线程之间数据完整性和状态同步的问题,设计为在任意时刻只有一个线程在解释器中运行。
  • 线程:程序执行的最小单位。
  • 进程:系统资源分配的最小单位。
  • 线程安全:多线程环境中,共享数据同一时间只能有一个线程来操作。
  • 原子操作:原子操作就是不会因为进程并发或者线程并发而导致被中断的操作。

还有一个重要的结论:当对全局资源存在写操作时,如果不能保证写入过程的原子性,会出现脏读脏写的情况,即线程不安全。Python的GIL只能保证原子操作的线程安全,因此在多线程编程时我们需要通过加锁来保证线程安全。

最简单的锁是互斥锁(同步锁),互斥锁是用来解决io密集型场景产生的计算错误,即目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。

下面我们会来介绍如何使用互斥锁。

 

2. Threading.Lock实现互斥锁的简单示例

我们通过Threading.Lock()来实现锁。

以下是线程不安全的例子:

  1. >>> import threading
  2. >>> import time
  3. >>> def sub1():
  4. global count
  5. tmp = count
  6. time.sleep(0.001)
  7. count = tmp + 1
  8. time.sleep(2)
  9. >>> count = 0
  10. >>> def verify(sub):
  11. global count
  12. thread_list = []
  13. for i in range(100):
  14. t = threading.Thread(target=sub,args=())
  15. t.start()
  16. thread_list.append(t)
  17. for j in thread_list:
  18. j.join()
  19. print(count)
  20. >>> verify(sub1)
  21. 14

在这个例子中,我们把

  1. count+=1

代替为

  1. tmp = count
  2. time.sleep(0.001)
  3. count = tmp + 1

是因为,尽管count+=1是非原子操作,但是因为CPU执行的太快了,比较难以复现出多进程的非原子操作导致的进程不安全。经过代替之后,尽管只sleep了0.001秒,但是对于CPU的时间来说是非常长的,会导致这个代码块执行到一半,GIL锁就释放了。即tmp已经获取到count的值了,但是还没有将tmp + 1赋值给count。而此时其他线程如果执行完了count = tmp + 1, 当返回到原来的线程执行时,尽管count的值已经更新了,但是count = tmp + 1是个赋值操作,赋值的结果跟count的更新的值是一样的。最终导致了我们累加的值有很多丢失。

下面是线程安全的例子,我们可以用threading.Lock()获得锁

  1. >>> count = 0
  2. >>> def sub2():
  3. global count
  4. if lock.acquire(1):
        #acquire()是获取锁,acquire(1)返回获取锁的结果,成功获取到互斥锁为True,如果没有获取到互斥锁则返回False
  5. tmp = count
  6. time.sleep(0.001)
  7. count = tmp + 1
  8. time.sleep(2)
  9. lock.release() 一系列操作结束之后需要释放锁
  10. >>> def verify(sub):
  11. global count
  12. thread_list = []
  13. for i in range(100):
  14. t = threading.Thread(target=sub,args=())
  15. t.start()
  16. thread_list.append(t)
  17. for j in thread_list:
  18. j.join()
  19. print(count)
  20. >>> verify(sub2)
  21. 100

 

 获取锁和释放锁的语句也可以用Python的with来实现,这样更简洁

  1. >>> count = 0
  2. >>> def sub3():
  3. global count
  4. with lock:
  5. tmp = count
  6. time.sleep(0.001)
  7. count = tmp + 1
  8. time.sleep(2)
  9. >>> def verify(sub):
  10. global count
  11. thread_list = []
  12. for i in range(100):
  13. t = threading.Thread(target=sub,args=())
  14. t.start()
  15. thread_list.append(t)
  16. for j in thread_list:
  17. j.join()
  18. print(count)
  19. >>> verify(sub3)
  20. 100

 

3.  两种死锁情况及处理

死锁产生的原因

两种死锁:

3.1 迭代死锁与递归锁(RLock)

该情况是一个线程“迭代”请求同一个资源,直接就会造成死锁。这种死锁产生的原因是我们标准互斥锁threading.Lock的缺点导致的。标准的锁对象(threading.Lock)并不关心当前是哪个线程占有了该锁;如果该锁已经被占有了,那么任何其它尝试获取该锁的线程都会被阻塞,包括已经占有该锁的线程也会被阻塞

下面是例子,

  1. #/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. import threading
  5. import time
  6. count_list = [0,0]
  7. lock = threading.Lock()
  8. def change_0():
  9. global count_list
  10. with lock:
  11. tmp = count_list[0]
  12. time.sleep(0.001)
  13. count_list[0] = tmp + 1
  14. time.sleep(2)
  15. print("Done. count_list[0]:%s" % count_list[0])
  16. def change_1():
  17. global count_list
  18. with lock:
  19. tmp = count_list[1]
  20. time.sleep(0.001)
  21. count_list[1] = tmp + 1
  22. time.sleep(2)
  23. print("Done. count_list[1]:%s" % count_list[1])
  24. def change():
  25. with lock:
  26. change_0()
    time.sleep(0.001)
  27. change_1()
  28. def verify(sub):
  29. global count_list
  30. thread_list = []
  31. for i in range(100):
  32. t = threading.Thread(target=sub, args=())
  33. t.start()
  34. thread_list.append(t)
  35. for j in thread_list:
  36. j.join()
  37. print(count_list)
  38. if __name__ == "__main__":
  39. verify(change)

示例中,我们有一个共享资源count_list,有两个分别取这个共享资源第一部分和第二部分的数字(count_list[0]和count_list[1])。两个访问函数都使用了锁来确保在获取数据时没有其它线程修改对应的共享数据。
现在,如果我们思考如何添加第三个函数来获取两个部分的数据。一个简单的方法是依次调用这两个函数,然后返回结合的结果。

这里的问题是,如有某个线程在两个函数调用之间修改了共享资源,那么我们最终会得到不一致的数据。

最明显的解决方法是在这个函数中也使用lock。然而,这是不可行的。里面的两个访问函数将会阻塞,因为外层语句已经占有了该锁

结果是没有任何输出,死锁。

为了解决这个问题,我们可以用threading.RLock代替threading.Lock

  1. #/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. import threading
  5. import time
  6. count_list = [0,0]
  7. lock = threading.RLock()
  8. def change_0():
  9. global count_list
  10. with lock:
  11. tmp = count_list[0]
  12. time.sleep(0.001)
  13. count_list[0] = tmp + 1
  14. time.sleep(2)
  15. print("Done. count_list[0]:%s" % count_list[0])
  16. def change_1():
  17. global count_list
  18. with lock:
  19. tmp = count_list[1]
  20. time.sleep(0.001)
  21. count_list[1] = tmp + 1
  22. time.sleep(2)
  23. print("Done. count_list[1]:%s" % count_list[1])
  24. def change():
  25. with lock:
  26. change_0()
    time.sleep(0.001)
  27. change_1()
  28. def verify(sub):
  29. global count_list
  30. thread_list = []
  31. for i in range(100):
  32. t = threading.Thread(target=sub, args=())
  33. t.start()
  34. thread_list.append(t)
  35. for j in thread_list:
  36. j.join()
  37. print(count_list)
  38. if __name__ == "__main__":
  39. verify(change)

 

3.2 互相等待死锁与锁的升序使用

死锁的另外一个原因是两个进程想要获得的锁已经被对方进程获得,只能互相等待又无法释放已经获得的锁,而导致死锁。假设银行系统中,用户a试图转账100块给用户b,与此同时用户b试图转账500块给用户a,则可能产生死锁。
2个线程互相等待对方的锁,互相占用着资源不释放。

下面是一个互相调用导致死锁的例子:

  1. #/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. import threading
  5. import time
  6. class Account(object):
  7. def __init__(self, name, balance, lock):
  8. self.name = name
  9. self.balance = balance
  10. self.lock = lock
  11. def withdraw(self, amount):
  12. self.balance -= amount
  13. def deposit(self, amount):
  14. self.balance += amount
  15. def transfer(from_account, to_account, amount):
  16. with from_account.lock:
  17. from_account.withdraw(amount)
  18. time.sleep(1)
  19. print("trying to get %s\'s lock..." % to_account.name)
  20. with to_account.lock:
  21. to_account_deposit(amount)
  22. print("transfer finish")
  23. if __name__ == "__main__":
  24. a = Account(\'a\',1000, threading.Lock())
  25. b = Account(\'b\',1000, threading.Lock())
  26. thread_list = []
  27. thread_list.append(threading.Thread(target = transfer, args=(a,b,100)))
  28. thread_list.append(threading.Thread(target = transfer, args=(b,a,500)))
  29. for i in thread_list:
  30. i.start()
  31. for j in thread_list:
  32. j.join()

最终的结果是死锁:

  1. trying to get account a\'s lock...
  2. trying to get account b\'s lock...

即我们的问题是:

你正在写一个多线程程序,其中线程需要一次获取多个锁,此时如何避免死锁问题。
解决方案:
多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。举个例子:一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。 其实解决这个问题,核心思想也特别简单:目前我们遇到的问题是两个线程想获取到的锁,都被对方线程拿到了,那么我们只需要保证在这两个线程中,获取锁的顺序保持一致就可以了。举个例子,我们有线程thread_a, thread_b, 锁lock_1, lock_2。只要我们规定好了锁的使用顺序,比如先用lock_1,再用lock_2,当线程thread_a获得lock_1时,其他线程如thread_b就无法获得lock_1这个锁,也就无法进行下一步操作(获得lock_2这个锁),也就不会导致互相等待导致的死锁。简言之,解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的,示例如下:

 

  1. #/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. import threading
  5. import time
  6. from contextlib import contextmanager
  7. thread_local = threading.local()
  8. @contextmanager
  9. def acquire(*locks):
  10. #sort locks by object identifier
  11. locks = sorted(locks, key=lambda x: id(x))
  12. #make sure lock order of previously acquired locks is not violated
  13. acquired = getattr(thread_local,\'acquired\',[])
  14. if acquired and (max(id(lock) for lock in acquired) >= id(locks[0])):
  15. raise RuntimeError(\'Lock Order Violation\')
  16. # Acquire all the locks
  17. acquired.extend(locks)
  18. thread_local.acquired = acquired
  19. try:
  20. for lock in locks:
  21. lock.acquire()
  22. yield
  23. finally:
  24. for lock in reversed(locks):
  25. lock.release()
  26. del acquired[-len(locks):]
  27. class Account(object):
  28. def __init__(self, name, balance, lock):
  29. self.name = name
  30. self.balance = balance
  31. self.lock = lock
  32. def withdraw(self, amount):
  33. self.balance -= amount
  34. def deposit(self, amount):
  35. self.balance += amount
  36. def transfer(from_account, to_account, amount):
  37. print("%s transfer..." % amount)
  38. with acquire(from_account.lock, to_account.lock):
  39. from_account.withdraw(amount)
  40. time.sleep(1)
  41. to_account.deposit(amount)
  42. print("%s transfer... %s:%s ,%s: %s" % (amount,from_account.name,from_account.balance,to_account.name, to_account.balance))
  43. print("transfer finish")
  44. if __name__ == "__main__":
  45. a = Account(\'a\',1000, threading.Lock())
  46. b = Account(\'b\',1000, threading.Lock())
  47. thread_list = []
  48. thread_list.append(threading.Thread(target = transfer, args=(a,b,100)))
  49. thread_list.append(threading.Thread(target = transfer, args=(b,a,500)))
  50. for i in thread_list:
  51. i.start()
  52. for j in thread_list:
  53. j.join()

我们获得的结果是

  1. 100 transfer...
  2. 500 transfer...
  3. 100 transfer... a:900 ,b:1100
  4. transfer finish
  5. 500 transfer... b:600, a:1400
  6. transfer finish

成功的避免了互相等待导致的死锁问题。

在上述代码中,有几点语法需要解释:

  • 1. 装饰器@contextmanager是用来让我们能用with语句调用锁的,从而简化锁的获取和释放过程。关于with语句,大家可以参考浅谈 Python 的 with 语句(https://www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/)。简言之,with语句在调用时,先执行 __enter__()方法,然后执行with结构体内的语句,最后执行__exit__()语句。有了装饰器@contextmanager. 生成器函数中 yield 之前的语句在 __enter__() 方法中执行,yield 之后的语句在 __exit__() 中执行,而 yield 产生的值赋给了 as 子句中的 value 变量。
  • 2. try和finally语句中实现的是锁的获取和释放。
  • 3. try之前的语句,实现的是对锁的排序,以及锁排序是否被破坏的判断。

今天我们主要讨论了Python多线程中如何保证线程安全,互斥锁的使用方法。另外着重讨论了两种导致死锁的情况:迭代死锁与互相等待死锁,以及这两种死锁的解决方案:递归锁(RLock)的使用和锁的升序使用。

对于多线程编程,我们将在下一篇文章讨论线程同步(Event)问题,以及对Python多线程模块(threading)进行总结。

 

参考文献:

1. 深入理解 GIL:如何写出高性能及线程安全的 Python 代码 http://python.jobbole.com/87743/

2. Python中的原子操作 https://www.jianshu.com/p/42060299c581

3. 详解python中的Lock与RLock https://blog.csdn.net/ybdesire/article/details/80294638

4. 深入解析Python中的线程同步方法 https://www.jb51.net/article/86599.htm

5.  Python中死锁的形成示例及死锁情况的防止 https://www.jb51.net/article/86617.htm

6.  举例讲解 Python 中的死锁、可重入锁和互斥锁 http://python.jobbole.com/82723/

7.  python基础之多线程锁机制

8. python–threading多线程总结

9. Python3入门之线程threading常用方法

10. 浅谈 Python 的 with 语句 https://www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/

版权声明:本文为ArsenalfanInECNU原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ArsenalfanInECNU/p/10022740.html