引言
虽然说 Python 受限于 CPython 的实现,存在的 GIL 会导致我们在使用多线程的时候,没法利用多核跑多线程。但是有的时候还是会用到线程的,尤其是针对一些 I/O 密集型的任务,也可以使用它们。
在使用多线程编程时,我们随时需要注意竞态条件(race condition)和数据竞争(data race)的问题,前者会导致我们在不同的时间点运行程序得到的输出可能不同;而后者则更为可怕,容易导致共享的数据结构被错误修改,甚至导致程序崩溃或者出现莫名其妙的 Bug。这个时候自然就要用到 Python threading 模块为我们提供的若干同步原语了。
那么,我们常用的 Lock、RLock、条件变量(Condition Variables)、信号量(Semaphore)等是如何实现的呢?接下来的源码学习是基于 CPython master 分支的线程模块。希望在学习完它们的实现后,能够加深理解,合理运用。
源码学习
CPython 的 threading 模块实际上是基于 Java 的线程模型实现的,所以熟悉 Java 的话,自然也不会对该模块的实现感到陌生。该模块是基于更底层的 _thread
模块,抽象出更加方便使用的线程模型,核心包括 threading.Thread
线程类封装,便于用户继承或组合;此外还有一些同步原语的实现。Python/thread_nt.h
文件中是 C 语言实现的底层和线程有关的函数(如锁的创建和维护、线程的创建和管理)。
同步原语
Lock
该模块中,Lock
其实是使用了底层 _thread.allocate_lock
函数来创建锁的。代码也很简单:
1 | Lock = _allocate_lock |
Lock 为我们提供了 acquire()
和 release()
这两个主要的方法。当一个线程持有锁时,其它线程调用 acquire()
方法时会被阻塞(此时线程一般就是睡眠等待了),直到主动 release()
后,等待锁的线程会被唤醒。
关于 Lock 有两点值得注意:
- 该锁是不可重入的,也就是如果在一个函数中递归
acquire()
会导致死锁的问题。为了避免这种问题,一般会使用RLock
来代替 - Lock 并非 Mutex(互斥锁),且它底层是通过信号量那样实现的,本身不会记录谁持有了该锁,也就是说 Lock 可以在不同的线程中被引用,可以在主线程获取,而在子线程释放它。具体可以在
CPython/Python/thread_nt.h:PyThread_allocate_lock
可以看到它的实现如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/*
* Lock support. It has to be implemented as semaphores.
* I [Dag] tried to implement it with mutex but I could find a way to
* tell whether a thread already own the lock or not.
* Lock 支持:它必须以信号量的方式来实现。我尝试使用互斥锁实现过,但是我
* 发现了另外一种方式可以得知一个线程是否持有了锁。
*/
PyThread_type_lock
PyThread_allocate_lock(void)
{
PNRMUTEX aLock;
dprintf(("PyThread_allocate_lock called\n"));
if (!initialized)
PyThread_init_thread();
aLock = AllocNonRecursiveMutex() ;
dprintf(("%lu: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));
return (PyThread_type_lock) aLock;
}
// 其中 PNRMUTEX 定义如下,它并不会告诉我们当前是哪个线程
// 持有了锁
typedef struct _NRMUTEX
{
PyMUTEX_T cs;
PyCOND_T cv;
int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;
比较有趣的是,其实 PyCOND
即条件变量是通过信号量来实现的;而接下来我们会看到,在 Python 的 threading 模块中,我们使用了 Condition 实现了信号量。
RLock
RLock 就是可重入锁(Reentrant Lock),它可以被持有锁的线程多次执行 acquire()
,而不会发生阻塞和死锁的问题。它的实现思路很简单:
- 规定如果一个线程成功持有了该锁,则将该锁的所有权交给该线程,并且只有该线程可以释放锁,其它线程无法释放;
- 当在持有锁的线程中递归获取锁的时候,实际并不会执行底层的
_lock.acquire()
方法,而是只给计数器递增;且释放锁的时候也是先给计数器递减,直到为 0 后才会释放锁。
所以在使用 RLock 的时候一定要记得 acquire()
和 release()
的调用次数得匹配才能真正释放锁。接下来简单看下源码实现:
1 | def RLock(*args, **kwargs): |
Condition
条件变量是后面几个同步原语实现的基础,值得重点学习下。条件变量的实现原理比较简单:所有等待的线程会被加入到等待队列中,只有在需要的时候会被唤醒(可以想想如何实现 waiter 线程的等待和唤醒呢?)。
在分析源码前,我们可以看看 Condition
类提供了哪些主要接口:
wait(timeout=None)
,线程可以调用该接口等待被唤醒notify()
,线程可以调用该接口通知队列中一个或多个等待线程被唤醒
接下来看看源码实现:
1 | class Condition: |
Semaphore
1 | class Semaphore: |
Event
1 | class Event: |
Barrier
通常可以使用 Barrier 实现并发初始化,然后一切就绪后才会进入下一个阶段。应用示例如下:
1 | # coding: utf-8 |
接下来看看 Barrier 是如何实现的:
1 | # Barrier 是基于部分的 `pthread_barrier_*` API 和 Java 中的 `CyclicBarrier` |
总结
Python 源码的注释太丰富了,以至于我都不想翻译成中文。所以结合注释看代码即可~