返回 导航

Python / AI

hangge.com

Python - 多线程threading开发使用详解2(各种线程锁、with语句、GIL、死锁)

作者:hangge | 2022-11-24 09:10

五、线程锁

1,基本介绍

(1)当多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,这被称为“线程不安全”。比如下面样例,我们创建两个子线程同时对 number 变量进行计算并赋值。虽然我们期望的最终结果应该是 2000000,但实际结果并不是如此。这是由于多个线程同时对一个变量进行存取操作造成的。
import threading

number = 0

def plus():
    global number       # global声明此处的number是外面的全局变量number
    for _ in range(1000000):    # 进行一个大数级别的循环加一运算
        number += 1

for i in range(2):  # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus)
    t.start()

time.sleep(2)       # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)

(2)为解决这个问题,我们可以使用线程锁。Python threading 模块中定义了几种线程锁,分别是:
  • Lock同步锁(又叫互斥锁,一次只能放行一个线程)
  • RLock递归锁(又叫可重入锁,一次只能放行一个线程)
  • Condition条件锁(一次可以放行任意个线程)
  • Event事件锁(一次全部放行线程)
  • Semaphore信号量锁(一次可以放行特定个线程)

2,Lock(同步锁)

    同步锁是一种独占锁,同一时刻只有一个线程可以访问共享的数据。使用很简单,初始化锁对象,然后将锁当做参数传递给任务函数,在任务中加锁,使用后释放锁。
import threading
import time

number = 0
lock = threading.Lock() # 创建一个同步锁对象

def plus():
    global number       # global声明此处的number是外面的全局变量number
    for _ in range(1000000):    # 进行一个大数级别的循环加一运算
        lock.acquire()        # 开始加锁
        number += 1
        lock.release()        # 释放锁

for i in range(2):  # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus)
    t.start()

time.sleep(2)       # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)

3,RLock(递归锁)

    递归锁是同步锁的一个升级版本,在同步锁的基础上可以做到连续重复使用多次 acquire() 后再重复使用多次 release() 的操作。
注意:加锁次数和解锁次数必须一致,否则也将引发死锁现象。
import threading
import time

number = 0
lock = threading.RLock() # 创建一个递归锁对象

def plus():
    global number       # global声明此处的number是外面的全局变量number
    for _ in range(1000000):    # 进行一个大数级别的循环加一运算
        lock.acquire()        # 开始加锁
        lock.acquire()
        number += 1
        lock.release()        # 释放锁
        lock.release()

for i in range(2):  # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus)
    t.start()

time.sleep(2)       # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)

4,Condition(条件锁)

(1)条件锁是在递归锁的基础上增加了能够暂停线程运行的功能,并且可以自由设定一次放行几个线程。其依然是通过 acquire()/release() 加锁解锁,并使用 wait( ) notify() 来控制线程执行的个数。
  • wait([timeout]):该方法将使线程进入 Condition 的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
  • notify():该方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用 acquire() 尝试获得锁定(进入锁定池),其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
  • notifyAll():该方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
import threading
from time import sleep

currentRunThreadNumber = 0 # 当前运行的线程数量
maxSubThreadNumber = 10 # 子线程总数

def task():
    global currentRunThreadNumber
    thName = threading.currentThread().name

    condLock.acquire()  # 上锁
    print("线程就绪并等待 : %s" % thName)

    condLock.wait()  # 暂停线程运行、等待唤醒
    currentRunThreadNumber += 1
    print("线程开始运行 : %s" % thName)

    condLock.release()  # 解锁


if __name__ == "__main__":
    condLock = threading.Condition() # 创建一个条件锁

    # 创建子线程
    for i in range(maxSubThreadNumber):
        subThreadIns = threading.Thread(target=task)
        subThreadIns.start()

    # 通过循环进行设置每次通知的线程数量
    while currentRunThreadNumber < maxSubThreadNumber:
        notifyNumber = int(
            input("请输入需要通知运行的线程数量:"))

        condLock.acquire() # 上锁
        condLock.notify(notifyNumber)  # 放行
        condLock.release() # 解锁

    print("主线程结束")

(2)上面样例程序运行后会启动 10 个子线程,并且会立即将 10 个子线程设置为等待状态。然后我们可以发送一个或者多个通知,来恢复被等待的子线程继续运行。运行结果如下:

5,Event(事件锁)

(1)事件锁是基于条件锁来做的,它与条件锁的区别在于一次只能放行全部,不能放行任意个数量的子线程继续运行。我们可以将事件锁看为红绿灯,当红灯时所有子线程都暂停运行,并进入“等待”状态,当绿灯时所有子线程都恢复“运行”。
事件主要提供了 clear()set()wait() is_set() 四个方法,各方法作用如下:
  • clear():调用该方法会将事件的 Flag 设置为 False(红灯)。
  • set():调用该方法会将 Flag 设置为 True(绿灯)。
  • wait():调用该方法将等待“红绿灯”信号。
  • is_set():该方法用于判断当前是否"绿灯放行"状态
import threading
from time import sleep

maxSubThreadNumber = 3 # 最大子线程数量
event = threading.Event() # 创建一个事件对象

# 子线程函数
def task():
    while True:
        thName = threading.currentThread().name
        if event.is_set():      # 判断当前是否"绿灯放行"状态
            print("%s 线程运行中。" % thName)
            sleep(1)
        else:
            print("红灯亮, %s 停止运行." % thName) 
            event.wait()
            print("%s 看到绿灯亮了,继续运行。" % thName)
    #print("线程 %s 结束" % thName) 
    
if __name__ == "__main__":
    for i in range(maxSubThreadNumber):
      subThreadIns = threading.Thread(target = task)
      subThreadIns.start()
    event.set()# 设置为绿灯
    sleep(3)
    event.clear()# 设置为红灯 
    sleep(3)
    event.set()# 设置为绿灯

(2)上面是一个模拟红绿灯控制汽车通行的例子,运行结果如下:

6,Semaphore(信号量锁)

(1)信号量锁也是根据条件锁来做的,它允许一定数量的线程同时更改数据。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。
import time
import threading

# 设置允许5个线程同时运行的信号量锁
semaphore = threading.BoundedSemaphore(5)

def run(n):
    semaphore.acquire() # 获取信号量
    print("run the thread: %s" % n, "at:", time.ctime())
    time.sleep(5)
    semaphore.release() # 释放信号量

# 创建20个线程
for i in range(20):
    t = threading.Thread(target=run, args=(i,))
    t.start()

(2)运行结果如下,可以看到5个一批的线程被放行:

六、with 语句

1,基本介绍

(1)所有的线程锁都有一个加锁和释放锁的动作,非常类似文件的打开和关闭。在加锁后,如果线程执行过程中出现异常或者错误,没有正常的释放锁,那么其他的线程会造到致命性的影响。通过 with 上下文管理器,可以确保锁被正常释放。
注意:只有事件锁(Event)不能够使用 with 语句,其他类型的锁均可以使用 with 语句。

(2)with语句的使用格式如下:
with some_lock:
    # 执行任务...

(3)其相当于如下代码,可以看到及时出现异常或者错误也能够正常释放锁:
some_lock.acquire()
try:
    # 执行任务..
finally:
    some_lock.release()

2,使用样例

下面使用 with 语句对本文第 2 个同步锁的样例进行改造,代码如下,高亮代码为改动的部分:
import threading
import time
 
number = 0
lock = threading.Lock() # 创建一个同步锁对象
 
def plus():
    global number       # global声明此处的number是外面的全局变量number
    for _ in range(1000000):    # 进行一个大数级别的循环加一运算
        with lock: # 自动加锁/解锁
            number += 1
 
for i in range(2):  # 用2个子线程,就可以观察到脏数据
    t = threading.Thread(target=plus)
    t.start()
 
time.sleep(2)       # 等待2秒,确保2个子线程都已经结束运算。
print("主线程执行完毕后,number = ", number)

七、全局解释器锁(GIL)

1,Python 多线程的特点

(1)在 Python 中,无论 CPU 有多少核,同时只能执行一个线程。这是由于 GIL 的存在导致的。多线程的工作流程如下:
  • 拿到公共数据
  • 申请 GIL
  • Python 解释器调用操作系统原生线程
  • cpu 执行运算
  • 当该线程执行一段时间消耗完,无论任务是否已经执行完毕,都会释放 GIL
  • 下一个被 CPU 调度的线程重复上面的过程

(2)Python 针对不同类型的任务,多线程执行效率是不同的:
  • 对于 CPU 密集型任务(各种循环处理、计算等等):由于计算工作多,ticks 计数很快就会达到阈值,然后触发 GIL 的释放与再竞争(多个线程来回切换是需要消耗资源的),所以 Python 下的多线程对 CPU 密集型任务并不友好。
  • IO 密集型任务(文件处理、网络通信等涉及数据读写的操作):多线程能够有效提升效率(单线程下有 IO 操作会进行 IO 等待,造成不必要的时间浪费,而开启多线程能在线程 A 等待时,自动切换到线程 B,可以不浪费 CPU 的资源,从而能提升程序执行效率)。所以 Python 的多线程对 IO 密集型任务比较友好。

2,什么是 GIL?

(1)GIL 的全称是 Global Interpreter Lock(全局解释器锁),是 Python 设计之初为了数据安全所做的决定。
(2)Python 中的某个线程想要执行,必须先拿到 GIL。可以把 GIL 看作是执行任务的“通行证”,并且在一个 Python 进程中,GIL 只有一个。拿不到通行证的线程,就不允许进入 CPU 执行。
(3)GIL 只在 CPython 解释器中才有,因为 CPython 调用的是 c 语言的原生线程,不能直接操作 cpu,只能利用 GIL 保证同一时间只能有一个线程拿到数据。在 PyPy JPython 中没有 GIL
提示:虽然在 Python 的不同解释器实现中,如 PyPy 就移除了 GIL,其执行速度更快(不单单是去除 GIL 的原因)。但是,我们通常使用的 CPython 解释器版本还是占有着统治地位的使用量。

八、死锁

1,什么是死锁?

    死锁是由于两个或以上的线程互相持有对方需要的资源,且都不释放占有的资源,导致这些线程处于等待状态,既无法执行,也无法结束,只能靠操作系统强制终止。

2,产生死锁的四个必要条件

死锁产生的 4 个必要条件分别如下,只要其中任意一个不满足,就不会死锁。
  • 互斥性:线程对资源的占有是排他性的,一个资源只能被一个线程占有,直到释放。
  • 请求和保持条件:一个线程对请求被占有资源发生阻塞时,对已经获得的资源不释放。
  • 不剥夺:一个线程在释放资源之前,其他的线程无法剥夺占用。
  • 循环等待:发生死锁时,线程进入死循环,永久阻塞。

3,常见的3种死锁的类型

(1)静态的锁顺序死锁a b 两个方法都需要获得 A 锁和 B 锁。一个线程执行 a 方法且已经获得了 A 锁,在等待 B 锁;另一个线程执行了 b 方法且已经获得了 B 锁,在等待 A 锁。这种状态,就是发生了静态的锁顺序死锁。
提示:静态是指在程序中,对于某个锁来说加锁和解锁的位置是不变的。

(2)动态的锁顺序死锁:指两个线程调用同一个方法时,传入的参数颠倒造成的死锁。
提示:这里动态是指某个锁会根据参数的传递,在不同的位置加锁和解锁。

(3)协作对象之间的死锁:如果在持有锁时调用某个外部方法,那么将可能出现死锁问题。在这个外部方法中可能会获得其他锁,或者阻塞时间过长,导致其他线程无法及时获得当前被持有的锁。为了避免这种危险的情况发生,我们使用开放调用。
提示:如果调用某个外部方法时不需要持有锁,我们称之为开放调用。

4,避免死锁的方法

(1)有序资源分配法。即对锁进行排序,所有线程都以相同的顺序获得锁。
解决静态的锁顺序死锁:所有需要多个锁的线程,都要以相同的顺序来获得锁。
解决动态的锁顺序死锁:比较传入锁对象的哈希值,根据哈希值的大小来确保所有的线程都以相同的顺序获得锁 。

(2)使用定时锁。即加上一个超时时间,若一个线程没有在给定的时限内成功获得所有需要的锁,则会进行回退并释放所有已经获得的锁,然后等待一段随机的时间再重试。
注意:但是如果有非常多的线程同一时间去竞争同一批资源,就算有超时和回退机制,还是可能会导致这些线程重复地尝试但却始终得不到锁。

(3)银行家算法。是检查申请人对资源的最大需求量,如果现在各种资源都能满足的申请人的要求,就能满足申请人的要求,申请人很快就能完成计算,释放占有的资源,保证系统的所有过程都能完成,可以避免死锁。
注意:理论上银行家算法可以非常有效地避免死锁。但是,从某种意义上说,它缺乏实用价值,因为很少有进程能够在运行前就知道其所需资源的最大值,而且进程数也不是固定的,往往在不断地变化(如新用户登录或退出),况且原本可用的资源也可能突然间变成不可用(如磁带机可能坏掉)。因此,在实际中,如果有,也只有极少的系统使用银行家算法来避免死锁。
评论

全部评论(0)

回到顶部