Skip to main content

第八课:线程和进程

(一)进程

(1)什么是进程

想象一个情景:

有一个艺术家正在画一幅画。这位艺术家(父进程)在画到一半时决定让他的学徒(子进程)接手继续完成画作。在 fork 的情况下,学徒得到了和艺术家完全一样的画布、颜料和画框(即父进程的所有资源和环境的精确副本),然后从艺术家停止的那一刻起开始继续画画(即子进程从父进程分叉点开始执行)。

在 Unix 和 Linux 系统中,fork 系统调用就像这个比喻。当一个进程调用 fork 时,它创建了一个新进程,这个新进程几乎是原进程的完全副本。它们拥有相同的内存布局、相同的环境设置、相同的打开的文件描述符等。子进程从 fork 调用返回的地方开始执行。


现在,想象另一种情况:

其中艺术学院要求一个新的学徒(子进程)开始一幅全新的画作。在这里,学徒不会接手任何现有的画作,而是从头开始,只按照老师(父进程)给出的指导(如画画的主题或风格)来创作。

这就类似于在 Windows 系统中使用 multiprocessing 模块时的 spawn 方法。当你在 Python 中创建一个新的进程,Windows 上的 multiprocessing 默认使用 spawn 方法。在这种方式下,新进程从头开始执行,Python 解释器会被重新初始化。然后,只有必要的资源和信息(如要执行的目标函数和参数)被传递给子进程。意味着子进程不会继承父进程的内存状态或环境设置。


因此我们看到多进程 在 Unix 平台上的实现依赖于 fork,在 Windows 平台上的实现依赖于 spawn,但都属于Python 的 multiprocessing

  • Fork 的优点在于它的效率较高,因为不需要重新初始化整个进程和资源。

​ 但它的缺点在于复制父进程的状态可能会引入一些复杂性,特别是在多线程环境中。

  • Spawn 的优点在于它更简单、更干净,每个进程都是从一个已知的初始状态开始的。

​ 但缺点是它的启动成本更高,因为需要重新初始化新进程的环境。


(2)Fork 函数

fork() 是 Unix/Linux 操作系统提供的一个系统调用,它用于创建一个新的进程,称为子进程,它是当前进程的一个副本。

fork() 对父进程(即调用 fork() 的原始进程)和子进程都返回,但返回的值不同:

  • 在父进程中,fork() 返回新创建的子进程的进程ID。

  • 在子进程中,fork() 返回 0。

这种机制允许区分父进程和子进程,因为它们的执行代码是相同的。


Python 的 os 模块提供了对 fork() 系统调用的封装,使得在 Python 程序中可以使用这个功能。

注意以下os里的fork函数只能在linux和unix的python中使用!!windows没有这个函数!!

import os
print('Process (%s) start...' % os.getpid())#在父进程中打印其进程ID
pid = os.fork()                             #创建一个子进程。这行代码之后的执行流会在父进程和子进程中各自继续。
if pid == 0:   #如果 fork() 返回 0,表明这是子进程。打印自己的进程ID(os.getpid())和父进程ID(os.getppid())
    print('子进程(%s) 的父进程是 %s' % (os.getpid(), os.getppid()))
else:          #如果 fork() 返回非0值,表明这是父进程,返回值是子进程的ID。父进程打印自己的进程ID和子进程ID。
    print('进程(%s) 创造了子进程(%s)' % (os.getpid(), pid))
Process (5413) start...
进程(5413) 创造了子进程(5423)
/var/folders/yr/8nl3z9zn3t1fz0m_jw2261p00000gn/T/ipykernel_5413/859268877.py:3: DeprecationWarning: This process (pid=5413) is multi-threaded, use of fork() may lead to deadlocks in the child.
pid = os.fork() #创建一个子进程。这行代码之后的执行流会在父进程和子进程中各自继续。
Process (5413) start...
子进程(5423) 的父进程是 5413
  • os.getpid() 返回当前进程的进程ID

  • os.getppid() 返回当前进程的父进程的进程ID。

一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用 getppid() 就可以拿到父进程的 ID。

注意:这里 ifelse 语句都会执行!

当调用 pid = os.fork() 后,当前进程(父进程)被复制,并创建了一个子进程。pid 在父进程和子进程中的值是不一样的。

  • 父进程中,pid 是子进程的进程ID。
  • 子进程中,pid是 0。

这意味着在 fork() 调用点之后的代码会在两个不同的进程中独立执行:一次在父进程中,一次在子进程中

你可以理解为这段代码被复制了一份,在两个地方同时执行!!所以一个pid是0,另一个pid不是0,所以if和else都会执行!!


有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,

常见的 Apache服务器 就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。



(3)多进程的基本写法(Process)

from multiprocessing import Process

def exe(data):
    print(f'{data}进程已启动')
if __name__=="__main__":
    p1 = Process(target=exe, args=(1, )) 
    #`Process`类创建了进程对象,`target`参数我们传入一个函数来表示进程启动后要执行的代码,
    # 后面的`args`是一个元组,它代表了传递给函数(在这里就是exe函数)的参数data。注意此时exe函数没有括号
    p1.start()   #`Process`对象的`start`方法用来启动进程
    p2 = Process(target=exe, args=(2, ))
    p2.start()
    p1.join()     #join方法表示等待进程行结束,用于确保在主线程中需要等待子线p1、p2完成后才能继续的情况
    p2.join()
danger

注意:在 Windows 上运行多进程时,必须使用 if __name__=="__main__"

  • 仅在脚本被直接运行时,在 if __name__ == '__main__' 内的代码才会被执行.

  • 反之若该脚本被import导入,则 if __name__ == '__main__'内的代码不会被执行。

在 Windows 系统上,multiprocessing 使用 spawn 方法来创建新进程。所以当你创建一个新的进程时,实际上启动了一个全新的 Python 解释器进程,然后从头开始运行脚本,这就类似于被import时的行为。如果没有 if __name__ == '__main__',那么子进程还会再执行诸如 p1 = Process(target=exe, args=(1, )) 之类的代码,这就会导致循环的问题等等。


而在 Unix/Linux 系统中,multiprocessing 使用 fork 方法创建子进程,

因此子进程知道它是从哪里开始运行的,不需要重新执行整个脚本。也不需要 if __name__ == '__main__'



(4)进程池批量创建子进程

使用进程池需要使用 Pool 模块,基本步骤如下:

  1. 创建 Pool 对象:指定池中工作进程的数量。如果不指定,池的大小默认为系统 CPU 核心的数量。

  2. 使用 mapapply_async 方法:

    • map 方法适用于需要将函数应用于可迭代对象中的每个元素的情况。
    • apply_async 方法适用于更灵活的异步函数调用。
  3. 使用 close() 方法关闭池,这表明我们不再向池中添加新的任务。然后使用 join() 方法等待所有工作进程完成。


1. map 方法

map 类似于 Python 内置的 map 函数。它将一个函数应用于一个可迭代对象的所有元素。

import multiprocessing
def square(n):
    print('进程收到数据',n)
    return n * n
if __name__ == "__main__":
    pool=multiprocessing.Pool(4)
    list1 = [1, 2, 3, 4, 5]
    results = pool.map(square, list1)    #只有4个进程,其中一个进程在执行完之后再执行输入数据5
    print(results)
进程收到数据 1
进程收到数据 2
进程收到数据 3
进程收到数据 4
进程收到数据 5
[1, 4, 9, 16, 25]

它自动分配可迭代对象 list1 的每个元素到池中的进程,每个进程执行设定的 square 函数任务。最后收集所有进程的返回值到列表 results 中。注意:

  • map 是阻塞的,即它会等待所有结果完成后才继续执行。

  • pool.map 会自动使 列表里的每个元素都会被分配给一个进程,并执行 square 函数,直到所有元素都处理完毕。

  • pool.map 会收集所有进程的返回值,并将它们按照输入列表的顺序组织成一个新的列表,赋值给 results 变量。


2. apply_async 方法

apply_async 是异步的,它不会等待当前任务完成就立即返回一个 AsyncResult 对象。

import multiprocessing
def square(x):
    return x * x
def print_result(result):
    print(f'The result is {result}')

if __name__ == '__main__':
    pool= multiprocessing.Pool(4)
    result = pool.apply_async(square, (3,), callback=print_result)
    print(result.get())

它允许您单独地处理每个任务的结果,并在结果准备好时获取它。

您可以为 apply_async 提供回调函数,当操作完成时,回调函数会被触发。

思考:为什么要使用get()

  • 原因一:阻塞主进程,等待主程序完成

    回调函数 print_resultsquare 函数的执行完成后被调用

    map 不同,在执行 apply_async 时,主程序不会等待异步任务的完成,而是继续执行后续代码。如果程序结束,所有未完成的进程也会随之结束。也就是说 pool.apply_async 之后,主程序会继续执行,不会等待子进程中的 squareprint_result 函数执行结束。

    所以如果没有 get() 的话,可能回调函数 print_result 会来不及执行,就没有返回结果了

    为了确保看到异步任务的结果,需要在程序结束前等待异步任务完成,可以使用 AsyncResult 对象的 get() 方法来阻塞主程序,直到异步任务完成

  • 原因二:获取异步函数返回值

    回调函数 print_result 的输入参数是 apply_async 调用的返回值。

    最后的 result 并不是函数的返回值本身,而是一个封装了异步执行结果的对象。

    要获取异步调用的实际结果,您需要调用这个对象的 get() 方法。


在进程池中批量创建进程

from multiprocessing import Pool
import os, time
def long_time_task(name):
    print(f'启动进程 {name} 地址为:{os.getpid()}')
    time.sleep(3)
    print(f'进程{name}运行结束')

if __name__=='__main__':
    p = Pool(4)  
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    p.close()  #告诉进程池不再接受新的任务
    p.join()
启动进程 0 地址为:8213
启动进程 1 地址为:8215
启动进程 2 地址为:8216启动进程 3 地址为:8214

进程1运行结束进程0运行结束进程2运行结束

进程3运行结束
启动进程 4 地址为:8216
进程4运行结束

注意:和前面的get一样,这里的join不可缺少,因为要等待所有子进程结束后再结束主进程。

注意:这里建立了四个进程的进程池:所以对于给出的5个任务,先执行4个,其中1个执行好了第五个马上接上

注意:p = Pool(4) 还有一种写法,不需要写close

with multiprocessing.Pool(4) as pool:


(5)进程间通信

from multiprocessing import Process, Queue
import time
def put_data(my_queue):   # 一个示例函数,将数据放入队列
    data = [1, 2, 3, 4, 5]
    for item in data:
        my_queue.put(item)
        print(f"存入进程已经把数据 {item} 存入队列")

def get_data(my_queue):   # 另一个示例函数,从队列中获取数据
    time.sleep(1)
    result = []
    while not my_queue.empty():
        item = my_queue.get()
        result.append(item)
        print(f"取出进程已经把数据 {item} 取出队列")
    print("Received data:", result)

if __name__ == "__main__":
    my_queue = Queue()
    p1 = Process(target=put_data, args=(my_queue,))
    p2 = Process(target=get_data, args=(my_queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    # 最后关闭队列,不再使用
    my_queue.close()    
    my_queue.join_thread()
存入进程已经把数据 1 存入队列
存入进程已经把数据 2 存入队列
存入进程已经把数据 3 存入队列
存入进程已经把数据 4 存入队列
存入进程已经把数据 5 存入队列
取出进程已经把数据 1 取出队列
取出进程已经把数据 2 取出队列
取出进程已经把数据 3 取出队列
取出进程已经把数据 4 取出队列
取出进程已经把数据 5 取出队列
Received data: [1, 2, 3, 4, 5]

思考:为什么在get_data中需要先sleep(1)?

因为两个进程是同时进行的,很有可能put_data函数还没填入元素,get_data的while循环就检测到列表为空因此退出了



(6)多进程处理大体量问题

下面这个例子展示了快速计算 的求和,思路为:

  • 把 1 ~ 10000001 按照索引切分成 8 块
  • 每块的计算任务分配给一个进程,添加到进程列表中,并启动
  • 等待所有进程执行完成,把计算结果都压入 队列
  • 从队列中取出数据,求和得到最终答案。
from multiprocessing import Process, Queue   #之前说过这是为了不同线程之间的通信
from random import randint
from time import time


def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)   #将数据 total 放入队列result_queue的操作。这表示将计算得到的结果存储在队列中。

if __name__ == '__main__':
    processes = []
    number_list = [x for x in range(1, 100000001)]
    result_queue = Queue()
    index = 0
    
    # 启动8个进程将数据切片后进行运算
    for _ in range(8):
        p = Process(target=task_handler,args=(number_list[index:index + 12500000], result_queue))
        #在上面这行代码中,由于task_handler函数有两个传入变量,所以args里也要传入两个变量
        index += 12500000
        processes.append(p)
        p.start()
        
    # 开始记录所有进程执行完成花费的时间
    start = time()
    for p in processes:
        p.join()
    
    # 合并执行结果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()  #使用队列的 get() 方法来从队列中取出数据,从而实现数据的共享。
    print(total)
    end = time()
    print('Execution time: ', (end - start), 's', sep='')
5000000050000000
Execution time: 1.1256730556488037s

更高级的库 ProcessPoolExecutor

concurrent.futures 库中的 ProcessPoolExecutor 确实实现了并行计算。

当使用 ProcessPoolExecutor 时,它会创建一个进程池,并将任务分发到不同的进程中去执行。这些进程可以在多个CPU核心上并行运行,从而实现真正的并行计算。这对于计算密集型的任务特别有用,因为它们可以利用多核CPU的优势来加速计算过程。

from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=4)        # 创建一个包含4个工作进程的进程池
# 参数 max_workers 指定了池中的进程数量。如果不指定,默认使用的进程数是机器的CPU核心数。

future = executor.submit(function, *args, **kwargs)  # submit 方法提交要执行的函数和参数,返回一个 Future 对象,可以从中获取任务执行的结果
result = future.result()                             # 获取函数执行的结果。此方法会阻塞当前线程,直到任务完成。
executor.shutdown(wait=True)                         # 关闭进程池,释放资源

# 实例一:
from concurrent.futures import ProcessPoolExecutor
import time

def compute_square(number):
    time.sleep(1)  # 模拟耗时操作
    return number * number

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 使用列表推导式提交任务并立即收集Future对象
        futures = [executor.submit(compute_square, number) for number in numbers]

        # 通过调用result()等待结果
        results = [future.result() for future in futures]
        print(results)

实例二:

from concurrent.futures import ProcessPoolExecutor
import time

def compute_square(num1,num2):
    time.sleep(1)  # 模拟耗时操作
    return num1 * num2

if __name__=='__main__':
    max_workers=5   #最多并行进程数


    # 生成所有第一年第一天不是NAN的点的索引
    index_list=[i for i in range(10000)]
    tot=len(index_list)
    print('所有点的索引已经加载完成')
    
    with ProcessPoolExecutor(max_workers) as executor:
        for i in index_list:
            num1=i
            num2=i+1
            executor.submit(compute_square, num1,num2)

    print('所有文件处理完成')

(二)线程

(1)什么是线程

多线程 (Threading) 和多进程 (Multiprocessing)

线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。 线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行


为什么要使用多线程而不用多进程?

  • 进程之间不能共享内存,但线程之间共享内存非常容易。
  • 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此使用多线程来实现多任务并发执行比使用多进程的效率高
  • python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了python的多线程编程。

"主线程"(Main Thread)是指程序开始执行时由操作系统默认创建的线程。 主线程主要负责执行程序的入口点(如 if __name__ == '__main__': 块)以及创建和管理其他 "子线程" 或 "工作线程"。一旦主线程的代码执行完毕,即使子线程还在运行,整个程序也会结束。

在许多(GUI)应用程序中,主线程用于运行事件循环,处理用户的交互和界面更新。在这种情况下,耗时的操作通常会在子线程中处理,以避免阻塞主线程和界面。

import threading
import time
def worker():
    print("子线程正在运行")
    time.sleep(2)
    print("子线程结束")
if __name__ == '__main__':
    print("主线程开始")
    t = threading.Thread(target=worker)
    t.start()
    print("主线程继续执行")
    t.join()
    print("主线程结束")
主线程开始
子线程正在运行主线程继续执行

子线程结束
主线程结束


(2)多线程的基本写法

1. Thread

import threading
import time
def run(n):
    print('task',n)
    time.sleep(1)
    print('1s'+'\n')
    time.sleep(1)
    print('0s'+'\n')
    time.sleep(1)
t1 = threading.Thread(target=run,args=('t1',))     # target是要执行的函数名,args是函数对应的输入参数
t2 = threading.Thread(target=run,args=('t2',))
t1.start()
t2.start()
t2
1s

1s

0s

0s

再这段代码中,会发现两个换行符都先打出来,然后才是打出来0s。这是因为两个线程几乎在同时运行

注意输入参数不能写成 args='t1' 或者 args=('t1') !

2. 类的继承

from threading import Thread
import time
class MyThread(Thread): #注意不能直接导入threading
    def __init__(self,task,filename):
        super(MyThread,self).__init__() #建议简写为super().__init__(),效果一样的
        # 上一行调用了父类(threading.Thread)的 __init__ 方法。明确指定了要使用 super 的类(MyThread)和实例(self)。
        self._task=task
        self._filename=filename

    def run(self):   #注意,这里必须写run。
        # 线程是通过创建一个Thread对象并重写run方法来实现的。
        # 当你调用start方法时,线程会在后台自动运行run方法。而在一般的类中,run(self)是不会直接执行的
        print('线程:',self._task,'已启动','\n')
        time.sleep(1)
        print('等待中','\n')
        time.sleep(1)
        print('%s已模拟下载完成'%self._filename)

t1 = MyThread('task1','文件2')
t2 = MyThread('task2','文件1')
t1.start()
t2.start()
t1.join()
t2.join()
线程:线程: task1 已启动 

task2 已启动

等待中等待中



文件1已模拟下载完成文件2已模拟下载完成

注意:如果不把start()独立出来写,不仅两个线程的输出会在同一行,而且join()方法不能使用(会报错)



(3)批量创建多个线程

我们以往账户同时存多笔钱为例

import threading
import time
class Account():
    def __init__(self,number,money):
        self.money=money
        self.number=number

class AddMoneyThread(threading.Thread):
    def __init__(self, account, money):
        super().__init__()
        self.account = account
        self.money = money

    def run(self):   
        #注意不要把充值的方法直接放在init中,因为这样在创建线程的过程中会直接进行充值。
        # 放在run里,这样用start方法才会启动充值
        print(f"账户{self.account.number}正在充值{self.money}元")
        time.sleep(1)
        self.account.money += self.money
        print(f'账户{self.account.number}充值完成')

Account_list=[]
for number in range(10):
    Account_list.append(Account(number,10))

operating_list=[]
for account in Account_list:
    operation=AddMoneyThread(account,10)
    operating_list.append(operation)
    operation.start()
    
for operation in operating_list:
    operation.join()   
账户0正在充值10元账户1正在充值10元

账户2正在充值10元
账户3正在充值10元
账户4正在充值10元
账户5正在充值10元
账户6正在充值10元
账户7正在充值10元
账户8正在充值10元
账户9正在充值10元
账户1充值完成账户0充值完成账户9充值完成


账户4充值完成
账户3充值完成
账户7充值完成
账户6充值完成
账户2充值完成
账户5充值完成
账户8充值完成

这里加上一个循环来执行join的意义在于 确保这些线程执行完后再往后执行

由于这里后面没有代码了,因此 没有最后一个join循环是没有关系的!

注意:对于批量创建线程而言,对于单个线程start后不能马上使用join!

join() 方法会阻塞调用它的线程(在这种情况下,是主线程)直到 operation 线程完成。

这意味着您的主线程会等待每个 AddMoneyThread 完成,然后再创建并启动下一个 AddMoneyThread。

这实际上使得充值操作变成了顺序执行,而不是并发执行。



(4)线程竞争

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,

而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,

因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

当多个线程试图同时修改同一个数据时,如果没有适当的同步,就会出现竞态条件。

例如,多个线程都试图修改同一个对象的某个属性,会导致不可预测的结果。

比如以下代码

import threading 
target_number=0

class operater(threading.Thread):
    def __init__(self,order,number):
        super().__init__()
        self.order=order
        self.number=number
    
    def run(self):
        global target_number
        target_number= target_number+ self.number
        print(f'线程{self.order}将目标数据已经修改为{target_number}')
        target_number= target_number- self.number
        print(f'线程{self.order}将目标数据已经修改为{target_number}')

for order in range(1, 10):   
    Operater = operater(order,order)
    Operater.start()

只要这里循环的次数足够多,最后的target_number就可能不是0

因此需要使用锁或其他同步机制


1. 线程锁

在本示例中,我们创建了一个线程锁,每个线程要修改全局参数时获得锁,修改完成后释放锁。未获得锁的线程不可以修改全局参数。

import threading 
target_number = 10
lock = threading.Lock()  # 创建一个锁

class operater(threading.Thread):
    def __init__(self, order):
        super().__init__()
        self.order = order
    
    def run(self):
        global target_number
        with lock:  # 获取锁
            target_number = self.order
            print(f'线程{self.order}将目标数据已经修改为{self.order}')
        # 锁会在这个缩进块之后自动释放

for order in range(1, 5):
    Operater = operater(order)
    Operater.start()

注意:上面代码中的with方式获取锁可以展开写,代码如下

def run(self):
    global target_number
    lock.acquire()
    target_number = self.order
    print(f'线程{self.order}将目标数据已经修改为{self.order}')
    lock.release()

思考:for循环每一次都创建一个线程,但是每个线程的名字都叫Operater,这不会导致下一个线程覆盖前一个线程吗?

即使在循环中使用相同的变量名,每个线程实例也是独立的。每次调用 Operater.start() 都会启动一个新的线程,之前的线程不会受到影响。但是这个的前提是该线程已经被start过后,才不会被覆盖。

如果在for循环创建线程结束后再使用start,那启动的就只有最后定义的一个线程!!

在上面的代码中,为什么这里设置的锁会和修改 target_nmber 这个事件绑定?

锁(Lock)是用来保证在任何时刻只有一个线程可以执行特定代码块的机制。当一个线程获得了锁,其他试图获取该锁的线程会被阻塞,直到锁被释放。其实锁在lock = threading.Lock()中被定义时并没有绑定某一个事件。绑定事件的部分是with lock或者lock.aquire(),因为线程必须要先获得锁才可以执行中间包含的这一段代码

如果有多个任务需要锁,代码可以如下:

import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()

def taskA(order):
    with lock1:
        print(f"A类线程{order}正在执行任务A")
        time.sleep(1)
def taskB(order):
    with lock2:
        print(f"B类线程{order}正在执行任务B")
        time.sleep(1)

for i in range(1,5):
    operater1=threading.Thread(target=taskA,args=(i,))
    operater2=threading.Thread(target=taskB,args=(i,))
    operater1.start()
    operater2.start()
A类线程1正在执行任务A
B类线程1正在执行任务B
B类线程2正在执行任务BA类线程2正在执行任务A

A类线程3正在执行任务A
B类线程3正在执行任务B
A类线程4正在执行任务AB类线程4正在执行任务B

从输出结果中我们可以发现任务A和B时同时在进行的


2. threading.local()

使用 threading.local() 创建需要修改的全局变量,他会自带线程锁机制。

import threading
local_school = threading.local()  # 创建全局ThreadLocal对象:

def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    print(f'线程 {threading.current_thread().name} 输出 {local_school.student}' )
    
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

在上面的代码中,全局变量 local_school 就是一个 ThreadLocal 对象,每个 Thread 对它都可以读写 student 属性,但互不影响。

你可以把 local_school 看成全局变量,但每个属性如 local_school.student 都是线程的局部变量,可以任意读写而互不干扰。也不用管理锁的问题,ThreadLocal内部会处理



(5)死锁(DeadLock)

死锁指的是两个或多个线程或进程互相等待对方释放资源,从而导致它们都被永久地阻塞的情况。

简单来说,每个执行单元都在等待一个永远不可能被满足的条件,以便它们可以继续执行。

以下是几个经典的死锁案例

案例一:重复请求

import threading
import time
def operater(order):
    lock.acquire()
    lock.acquire()
    print(f"线程{order}正在执行")
    time.sleep(1)
    print(f"线程{order}执行结束")
    lock.release()
    lock.release()

if __name__=='__main__':
    lock=threading.Lock()
    for order in range(5):
        Operater=threading.Thread(target=operater,args=(order,))
        Operater.start()

案例2:两个资源的循环等待

假设有两个线程以及两个资源(Resource1 和 Resource2),并且每个线程都需要同时访问这两个资源才能完成工作

lock1 = threading.Lock()
lock2 = threading.Lock()

def threadA():
    lock1.acquire()
    print('线程A获取lock1')
    time.sleep(1)  # 模拟操作延时
    lock2.acquire()
    print('线程A获取lock2')
    lock2.release()
    lock1.release()

def threadB():
    lock2.acquire()
    print('线程B获取lock2')
    time.sleep(1)  # 模拟操作延时
    print('线程B获取lock1')
    lock1.release()
    lock2.release()

t1 = threading.Thread(target=threadA)
t2 = threading.Thread(target=threadB)
t1.start()
t2.start()
发生异常: RuntimeError
release unlocked lock
File "/Users/heihe/Desktop/coding-learning/utils/test.py", line 15, in threadA
lock1.release()
RuntimeError: release unlocked lock

案例三:多个线程和多个资源

在一个更复杂的场景中,可能有多个资源和多个线程,每个线程需要不同的资源组合才能完成任务。

lock1 = threading.Lock()
lock2 = threading.Lock()
lock3 = threading.Lock()

def thread1():
    lock1.acquire()
    lock2.acquire()
    lock2.release()
    lock1.release()

def thread2():
    lock2.acquire()
    lock3.acquire()
    lock3.release()
    lock2.release()

def thread3():
    lock3.acquire()
    lock1.acquire()
    lock1.release()
    lock3.release()

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t3 = threading.Thread(target=thread3)
t1.start()
t2.start()
t3.start()

案例4:递归死锁

import threading
lock = threading.Lock()

def operater(count):
    if count == 0:
        return
    lock.acquire()  
    print(f"线程{count} 获取了锁")

    operater(count - 1)  # 递归调用

    lock.release()  
    print(f"线程{count} 释放了锁")

thread = threading.Thread(target=operater, args=(3,))
thread.start()
thread.join()

如何避免死锁

  1. 确保所有线程以相同的顺序请求锁:这可以防止循环等待的发生。

  2. 使用超时在锁请求上:这样线程可以在等待太长时间后放弃锁请求。

  3. 避免持有一个锁同时请求另一个锁:尽量设计避免在持有一个锁的情况下,去请求另一个锁。

  4. 使用线程同步机制,如信号量:这可以帮助更好地控制线程间资源的使用



(6)重入锁(Rlock)

与标准的线程锁(Mutex)相比,重入锁允许同一个线程多次获得锁。

如果一个线程已经持有了 RLock,它可以再次请求不会被阻塞。这避免了因同一线程多次请求同一锁而导致的死锁问题

RLock 内部维护着一个计数器,每次成功调用 acquire(),计数器会增加;每次调用 release(),计数器会减少。只有当计数器回到零时,锁才会真正释放,其他线程才能获取锁。

RLock 主要在需要同一个线程多次获取同一锁的场景中使用,例如在递归调用或者复杂的操作流程中。

在简单的线程同步场景下,使用 Lock 和 RLock 的效果是相同的。

比如下面这段代码,如果 lock=threading.RLock() 改成 lock=threading.Lock(),就会死锁,运行不了

import threading
import time
def operater(order):
    lock.acquire()
    lock.acquire()
    print(f"线程{order}正在执行")
    time.sleep(1)
    print(f"线程{order}执行结束")
    lock.release()
    lock.release()

if __name__=='__main__':
    lock=threading.RLock()
    for order in range(5):
        Operater=threading.Thread(target=operater,args=(order,))
        Operater.start()


(7)信号量(Semaphore)

允许一定数量的线程同时运行:信号量(BoundedSemaphore类)

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,

import time
import threading
def run(n,semaphore):
    semaphore.acquire()   #加锁
    print(f'线程{n}获得锁')
    time.sleep(1)
    semaphore.release()    #释放
    print(f'线程{n}释放锁')

if __name__== '__main__':
    semaphore = threading.BoundedSemaphore(2)   #最多允许2个线程同时运行
    for i in range(5):
        t = threading.Thread(target=run,args=( i,semaphore))   #注意semaphore要传入函数内!Lock不用!
        t.start()
    while threading.active_count() !=1:
        pass
    else:
        print('所有线程均已完成')

从输出结果来看会发现始终只有两个线程占据了锁,而一个线程一旦释放了锁就会立即被另一个线程获取,而不是两个锁一起释放



(8)线程同步(Event)

threading.Event 是用于线程间通信的同步机制。它允许一个线程控制事件信号,其他线程则可以捕捉这个信号。

Event 有一个属性flag,这个属性有以下几个关键的方法:

  • set(): 将flag设置为真。所有处于等待该事件的线程将被唤醒。

  • clear(): 将flag设置为假。

  • wait(timeout=None): 阻塞这个线程,直到flag为真或直到指定的超时时间。如果flag已经为真,则立即返回。

  • is_set(): 返回flag的状态。如果是真,则返回 True;否则返回 False。

import threading
import time
event = threading.Event()  #内部标志flag默认设置为False

def waiter(event):
    print(f"等待线程开始等待事件信号\n")
    event.wait()  # 阻塞,直到事件被设置
    print(f"等待线程监测到Event的flag为True\n")

def setter(event):
    print("设置线程已启动\n")
    print(f'设置线程将当前flag属性设置为{event.is_set()}\n')
    time.sleep(3)  # 模拟做一些事情
    event.set()  # 设置事件
    print(f'设置线程将当前flag属性设置为{event.is_set()}\n')

waiter_thread = threading.Thread(target=waiter, args=(event,))
setter_thread = threading.Thread(target=setter, args=(event,))
waiter_thread.start()
setter_thread.start()
等待线程开始等待事件信号
设置线程已启动


设置线程将当前flag属性设置为False

设置线程将当前flag属性设置为True
等待线程监测到Event的flag为True

在上面的代码中,在 waiter 线程中,遇到 event.wait() 时,如果 eventflagFalse时会停止运行下面的代码(被阻塞)

同时 event.wait() 会实时监测 flag 的状态。一旦 flag 变为 True 就会继续执行下面的代码

一旦 event.set() 被另一个线程(如 setter 线程)调用,内部标志 flag 变为 True,所有等待该事件的线程(使用 event.wait())将不再阻塞,并继续执行。

注意事项:

wait() 方法可以带有超时时间,这在避免无限等待时非常有用。

Event 对象不用于保护共享资源,它只用于信号传递。如果需要同步对共享资源的访问,应该使用锁(如 Lock 或 RLock)


threading.Event 通常用于以下场景:

  • 一个线程需要等待另一个线程完成特定任务或发生特定事件。

  • 实现线程的暂停、恢复或终止操作。

  • 当多个线程需要同时开始执行任务时(例如:通过事件同步开始信号)。

(9)线程执行效率

python针对不同类型的代码执行效率也是不同的

结论:

  • I/O密集型任务,建议采取多线程,还可以采用多进程+协程的方式(例如:爬虫多采用多线程处理爬取的数据);

  • 对于计算密集型任务,多线程此时就不适用了。

我们把任务分为 I/O密集型(文件处理、网络爬虫等设计文件读写操作)和 计算密集型(各种循环处理、计算等),

而多线程在切换中又分为 I/O切换时间切换

如果任务属于是 I/O密集型:

  • 若不采用多线程,我们在进行I/O操作时,CPU要等待前面一个I/O任务完成后面的I/O任务才能进行,

  • 如果采用多线程的话,刚好可以切换到进行另一个I/O任务,充分利用CPU避免CPU处于闲置状态,提高效率。

如果多线程任务都是计算型:

  • CPU会一直在进行工作,直到一定的时间后采取多线程时间切换的方式进行切换线程,此时CPU一直处于工作状态,

    此种情况下并不能提高性能,相反在切换多线程任务时,可能还会造成时间和资源的浪费,导致效能下降。



(10)守护线程

守护线程通常用于执行一些后台任务,如监控、周期性数据清理等,这些任务不需要等待完成,只要主程序退出,它们就可以被终止。守护线程在编写一些并发程序时非常有用,因为它们可以帮助确保程序在退出时不会因为还有线程在运行而无法正常结束。

设置方法一

import threading
import time
def run(n):
    print('线程',n,'启动')
    time.sleep(1)
    print('线程',n,'结束')

if __name__ == '__main__':
    t=threading.Thread(target=run,args=(1,))
    t.daemon=True
    t.start()
    print('end')
    time.sleep(2)
    # 之所以要在最后加上一个sleep,是因为防止主线程结束了但是但是守护线程未结束,导致守护线程被强行终止
    # 如果没有最后的sleep,会输出“线程1启动”,但来不及  输出“线程1结束”

join方法也可以保证守护线程结束后再结束主进程 把上面最后一段改成:

if __name__ == '__main__':
    t=threading.Thread(target=run,args=(1,))
    t.daemon=True
    t.start()
    t.join()
    print('end')

设置方法二

import threading 
class DownloadTask(threading.Thread):
    def __init__(self, filename):
        super().__init__()
        self._filename = filename  
    def run(self):     
        print('%s 已经下载完毕' % self._filename)
thread=DownloadTask('金瓶梅.txt')
thread.daemon=True
thread.start()
thread.join()