概述
threading 模块提供了一种在单个进程内部并发地运行多个 线程 (从进程分出的更小单位) 的方式。 它允许创建和管理线程,以便能够平行地执行多个任务,并共享内存空间。 线程特别适用于 I/O 密集型的任务,如文件操作或发送网络请求,在此类任务中大部分时间都会消耗于等待外部资源
查看 threading 模块的源码,第一句是:
Thread module emulating a subset of Java's threading model.
Thread 模块模拟了 Java 线程模型的一个子集
运行线程
import time
import threading
def fetch_info():
time.sleep(5)
print("Info fetched")
tasks = []
for _ in range(5):
t = threading.Thread(target=fetch_info)
tasks.append(t)
print(tasks)
for task in tasks:
task.start()
print("All tasks done!")
执行上面的程序,大约 5 秒打印全部输出语句,输出结果
[<Thread(Thread-1 (fetch_info), initial)>, <Thread(Thread-2 (fetch_info), initial)>, <Thread(Thread-3 (fetch_info), initial)>, <Thread(Thread-4 (fetch_info), initial)>, <Thread(Thread-5 (fetch_info), initial)>]
All tasks done!
Info fetched
Info fetched
Info fetched
Info fetched
Info fetched
等待主线程
join 方法可以等待子线程执行完成
试想一个结果,如果子线程的工作是去修改修改一个文件,如果子线程没有完成主线程就读取这个文件,这个时候就会发生数据错误。
import time
import threading
def fetch_info():
time.sleep(5)
print("Info fetched")
tasks = []
for _ in range(5):
t = threading.Thread(target=fetch_info)
tasks.append(t)
print(tasks)
for task in tasks:
task.start()
for task in tasks:
task.join()
print("All tasks done!")
输出结果:
[<Thread(Thread-1 (fetch_info), initial)>, <Thread(Thread-2 (fetch_info), initial)>, <Thread(Thread-3 (fetch_info), initial)>, <Thread(Thread-4 (fetch_info), initial)>, <Thread(Thread-5 (fetch_info), initial)>]
Info fetched
Info fetched
Info fetched
Info fetched
Info fetched
All tasks done!
判断线程是否启动
在 Python 多线程中,threading.Event 是一种线程间同步机制,用来在多个线程之间传递信号,让线程知道何时可以执行或暂停。
基本概念Event 有一个内部标志位,线程可以调用:
wait()阻塞等待,直到 flag 为 Trueset()将 flag 设为 True,唤醒所有等待的线程clear()将 flag 设为 Falseis_set()查询 flag 状态
示例:
import threading
import time
event = threading.Event()
def worker():
print("Worker waiting for event...")
event.wait() # 阻塞,直到 event 被 set
print("Worker starts working!")
t = threading.Thread(target=worker)
t.start()
time.sleep(3)
print("Main thread sets event")
event.set() # 唤醒 worker
输出:
Worker waiting for event...
Main thread sets event
Worker starts working!
worker 线程在 event未触发时会阻塞,直到主线程发信号后才执行。Event对象最好只使用一次,因为多线程执行的复杂情况,导致无法预料哪个线程会先执行,是重置 Event对象的线程先执行,还是根据 Event标志的对象先执行。
线程锁
多线程程序共同修改一个资源时,大概率会引发竞态条件,例如:
import threading
a = 0
def add():
global a
for i in range(1000000):
a += 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
注意:以上程序在 py3.9 及以下版本会不等于 2000000,但是在 3.10 以上会等于 2000000 究其原因可以看这个视频 【python】听说因为有GIL,多线程连锁都不需要了?
为了防止多线程程序竞争同一资源导致的问题,我们可以给资源加 Lock 锁
import threading
a = 0
lock = threading.Lock()
def add():
lock.acquire()
global a
for i in range(1000000):
a += 1
lock.release()
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
这样就能杜绝多线程竞争冒险。现在让我们来解决死锁的问题:
import threading
lock = threading.Lock()
def f1():
lock.acquire()
f2()
lock.release()
def f2():
lock.acquire()
print("ok")
lock.release()
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t1.start()
t2.start()
t1.join()
t2.join()
看这段代码,如果正常运行,那么程序必定卡死,因为两个线程如果其一没有释放锁,那么另一个就无法拿到锁,互相等待。
import threading
lock = threading.RLock()
def f1():
lock.acquire()
f2()
lock.release()
def f2():
lock.acquire()
print("ok")
lock.release()
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t1.start()
t2.start()
t1.join()
t2.join()
使用可重入锁 RLock 对象就可以了,重入锁必须由获取它的线程释放。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;线程必须在每次获取它时释放一次。
限制并发数量
import threading
import time
sem = threading.Semaphore(2)
def worker(i):
with sem:
print(f"worker {i} enter")
time.sleep(2)
print(f"worker {i} leave")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
这个类的功能是限制同时能访问资源的线程数量,上面的示例中就是两个。
线程等待
import threading
import time
cond = threading.Condition()
queue = []
def producer():
time.sleep(1)
with cond:
queue.append("task")
print("produce task")
cond.notify()
def consumer():
with cond:
while not queue:
print("consumer wait")
cond.wait()
item = queue.pop()
print("consume", item)
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
t1.join()
t2.join()
延迟执行
import threading
import time
def task():
print("task run", time.time())
print("start", time.time())
t = threading.Timer(2, task)
t.start()
两秒后,线程会打印字符串,Timer 的作用就是执行时间后运行线程。
线程统一等待
让一组线程在某个同步点全部到齐后,再一起继续执行
import threading
import time
barrier = threading.Barrier(3)
def worker(i):
print(f"worker {i} prepare")
time.sleep(i) # 模拟准备时间不同
print(f"worker {i} wait")
barrier.wait()
print(f"worker {i} start")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
执行这段程序,你会发现 start最后统一打印。