11.4 多线程

CPython 提供了用于从 Python 创建、生成和控制线程的高层次 API 和低层次 API。

要了解 Python 线程,首先应该了解操作系统线程是如何工作的。 CPython 中有两种线程实现:

  1. pthreads: Linux 和 macOS 的 POSIX 线程;

  2. nt 线程: Windows 的 NT 线程。

在“进程的结构”一节中,你看到进程具有以下特征:

  • 用于子程序的

  • 内存;

  • 操作系统上文件套接字的访问途径。

扩展单个进程的最大限制是操作系统将有一个用于该可执行文件的程序计数器

为了解决这个问题,现代操作系统允许进程向操作系统发出信号,将它们分别加载到多个线程中执行。

每个线程都有自己的程序计数器,但使用与宿主进程相同的资源。每个线程也有自己的调用栈,所以它可以执行不同的函数。

因为多个线程可以读写同一个内存空间,所以可能会发生冲突。解决这个问题的方法是线程安全,它涉及确保内存空间在访问之前被单个线程锁定。

具有三个线程的单个进程具有以下结构:

图片内容: Process:进程, Stack:栈, Heap:堆, Process Memory:进程内存, Python Objects:Python 对象, Instructions:指令, CPython Runtime:CPython 运行时, Compiled Modules:编译模块, Files:文件, Locks:锁, Sockets:套接字, Thread 0:线程 0, Program Counter:程序计数器

参见

有关 Python 线程 API 的介绍性教程,请查看 Real Python 的“Intro to Python Threading.”a

ahttps://realpython.com/intro-to-python-threading/

全局解释器锁(GIL)

如果你熟悉 C 中的 NT 线程或 POSIX 线程,或者如果你使用过其他高级语言,那么你可能希望多线程是并行的。

在 CPython 中,线程基于 C API,但却是 Python 线程。这意味着每个 Python 线程都需要通过求值循环来执行 Python 字节码。

Python 求值循环不是线程安全的。解释器状态有很多部分是共享的和全局的,例如垃圾收集器。为了解决这个问题,CPython 开发人员实施了一个称为全局解释器锁 (GIL) 的超级锁。在栈帧求值循环中执行任何操作码之前,线程都需要获取 GIL。操作码被执行完成,GIL 就会被释放。

虽然它为 Python 中的每个操作提供全局线程安全,但这种方法有一个主要缺点。任何需要很长执行时间的操作都会让其他线程等待 GIL 被释放后才能执行。这意味着在任何给定时间只有一个线程可以执行 Python 字节码操作。

要获取 GIL,需要调用 take_gil()。要释放它,需要调用 drop_gil()。 GIL 的获取是在核心栈帧求值循环 _PyEval_EvalFrameDefault() 中进行的。

为了阻止单栈帧执行永久持有 GIL,求值循环状态存储一个标志,gil_drop_request。在栈帧中每个字节码操作完成后,检查此标志,并在重新获取之前临时释放 GIL:

    if (_Py_atomic_load_relaxed(&ceval->gil_drop_request)) {
        /* Give another thread a chance */
        if (_PyThreadState_Swap(&runtime->gilstate, NULL) != tstate) {
            Py_FatalError("ceval: tstate mix-up");
        }
        drop_gil(ceval, tstate);

        /* Other threads may run now */
        
        take_gil(ceval, tstate);
        
        /* Check if we should make a quick exit. */
        exit_thread_if_finalizing(tstate);
        
        if (_PyThreadState_Swap(&runtime->gilstate, tstate) != NULL) {
            Py_FatalError("ceval: orphan tstate");
        }
    }
...

尽管 GIL 对并行执行施加了限制,但它使 Python 中的多线程非常安全并且非常适合并发运行 I/O 密集的任务。

相关源文件

以下是与 threading 相关的源文件:

在 Python 中启动线程

为了演示使用多线程代码(尽管有 GIL)的性能提升,你可以在 Python 中实现一个简单的网络端口扫描器。

你将从克隆之前的脚本开始,但需要把逻辑改为使用 threading.Thread() 为每个端口生成一个线程。类似于 multiprocessing 的 API,它需要一个可调用的 target 和一个 args 元组。

在循环内启动线程,但不要等待它们完成。而是将线程实例添加到列表 threads 中:

    for port in range(80, 100):
        t = Thread(target=check_port, args=(host, port, results))
        t.start()
        threads.append(t)

一旦创建了所有线程,遍历 threads 列表并调用 .join() 等待它们完成:

    for t in threads:
        t.join()

接下来,遍历 results 队列中的所有条目并将它们打印到屏幕上:

    while not results.empty():
        print("Port {0} is open".format(results.get()))

这是整个脚本:

cpython-book-samples/33/portscanner_threads.py

from threading import Thread
from queue import Queue
import socket
import time

timeout = 1.0

def check_port(host: str, port: int, results: Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

def main():
    start = time.time()
    host = "localhost" # Replace with a host you own
    threads = []
    results = Queue()
    for port in range(80, 100):
        t = Thread(target=check_port, args=(host, port, results))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    while not results.empty():
        print("Port {0} is open".format(results.get()))
    print("Completed scan in {0} seconds".format(time.time() - start))

if __name__ == '__main__':
    main()

当你在命令行调用这个线程脚本时,它的执行速度将是单线程示例的十倍以上:

$ python portscanner_threads.py
Port 80 is open
Completed scan in 1.0101029872894287 seconds

这也比多进程示例快 50% 到 60%。请记住,多进程启动新进程时有开销。线程也确实有开销,但它要小得多。

你可能想知道,如果 GIL 意味着一次只能执行一个操作,那么为什么多线程示例更快?

下面是耗时 1–1000 毫秒的语句:

    result = sock.connect_ex((host, port))

在 C 扩展模块 Modules/socketmodule.c 中,这个函数实现了连接:

Modules/socketmodule.c 3245 行

static int
internal_connect(PySocketSockObject *s, struct sockaddr *addr, int addrlen,
                 int raise)
{
    int res, err, wait_connect;

    Py_BEGIN_ALLOW_THREADS
    res = connect(s->sock_fd, addr, addrlen);
    Py_END_ALLOW_THREADS

系统调用 connect() 前后用到了 Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS 宏。这些宏在 Include/ceval.h 中定义如下:

#define Py_BEGIN_ALLOW_THREADS { \
                        PyThreadState *_save; \
                        _save = PyEval_SaveThread();
#define Py_BLOCK_THREADS PyEval_RestoreThread(_save);
#define Py_UNBLOCK_THREADS _save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS PyEval_RestoreThread(_save); \
                 }

因此,当调用 Py_BEGIN_ALLOW_THREADS 时,它会调用 PyEval_SaveThread()。此函数将线程状态更改为 NULL释放 GIL:

Python/ceval.c 444 行

PyThreadState *
PyEval_SaveThread(void)
{
    PyThreadState *tstate = PyThreadState_Swap(NULL);
    if (tstate == NULL)
        Py_FatalError("PyEval_SaveThread: NULL tstate");
    assert(gil_created());
    drop_gil(tstate);
    return tstate;
}

由于 GIL 被释放,其他任何正在执行的线程都可以继续。该线程将坐下来等待系统调用响应,而不会阻塞求值循环。

一旦 connect() 成功或超时,Py_END_ALLOW_THREADS 宏将以原始线程状态作为参数调用 PyEval_RestoreThread()。恢复线程状态,重新获取 GIL。对 take_gil() 的调用是一个阻塞调用,其将等待信号量:

Python/ceval.c 458 行

void
PyEval_RestoreThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_RestoreThread: NULL tstate");
    assert(gil_created());
    
    int err = errno;
    take_gil(tstate);
    /* _Py_Finalizing is protected by the GIL */
    if (_Py_IsFinalizing() && !_Py_CURRENTLY_FINALIZING(tstate)) {
        drop_gil(tstate);
        PyThread_exit_thread();
        Py_UNREACHABLE();
    }
    errno = err;
    
    PyThreadState_Swap(tstate);
}

这不是唯一一个被非 GIL 阻塞对 Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS 包裹的系统调用。标准库中有超过三百处使用了非 GIL 阻塞对,包括:

  • 发出 HTTP 请求;

  • 与本地硬件交互;

  • 加密数据;

  • 读写文件。

线程状态

CPython 提供了自己的线程管理实现。因为线程需要在求值循环中执行 Python 字节码,所以在 CPython 中运行一个线程并不像生成一个操作系统线程那么简单。

Python 线程被称为 PyThread。你在“CPython 求值循环”一章中简要了解了它们。

Python 线程执行代码对象,其由解释器生成。

回顾一下:

  • CPython 有一个运行时,它有自己的运行时状态

  • CPython 可以有一个或多个解释器。

  • 解释器有一个状态称为解释器状态

  • 解释器将获取一个 code 对象并将其转换为一系列 frame 对象

  • 一个解释器至少有一个线程,每个线程都有一个线程状态

  • frame 对象在称为 frame 栈的栈中执行。

  • CPython 引用值栈中的变量。

  • 解释器状态包括其线程的链表。

单线程、单解释器运行时将具有以下状态:

图片内容: Runtime:运行时, Runtime State:运行时状态, Interpreter:解释器, Interpreter State:解释器状态, GIL:不译, Thread 0:线程 0, Primary:主的, Stack:栈, Program Counter:程序计数器, PyThread State:PyThread 状态, Frame:不译, Exceptions:异常, Current:当前的, Next Thread:下个线程, Previous:之前的, Boot State:引导状态, Heap:堆, Core Instructions:核心指令, Modules:模块, Files:文件, Locks:锁, Sockets:套接字

线程状态类型 PyThreadState 具有三十多个属性,包括:

  • 唯一标识符;

  • 链接其他线程状态的链表;

  • 产生该线程的解释器的状态;

  • 当前正在执行的 frame;

  • 当前递归深度;

  • 可选的追踪功能;

  • 当前正在处理的异常;

  • 当前正在处理的任何异步异常;

  • 引发的异常栈;

  • 一个 GIL 计数器;

  • 异步生成器计数器。

与多进程准备数据一样,线程具有引导状态。但是,线程共享相同的内存空间,因此无需序列化数据并通过文件流发送。

线程使用 threading.Thread 类型实例化。这是一个抽象 PyThread 类型的高级模块。 PyThread 实例由 C 扩展模块 _thread 管理。

_thread 模块执行新线程的入口点是 thread_PyThread_start_new_thread()start_new_thread()Thread 类型实例上的方法

新线程按以下顺序实例化:

  1. 创建 bootstate,链接到 target,带有参数 argskwargs

  2. bootstate 链接到解释器状态。

  3. 创建一个新的 PyThreadState,链接到当前的解释器。

  4. 如果尚未启用 GIL,调用 PyEval_InitThreads()

  5. 基于操作系统上的特定实现 PyThread_start_new_thread,启动新线程。

图片内容: Runtime:运行时, Runtime State:运行时状态, Interpreter:解释器, Interpreter State:解释器状态, GIL:不译, Thread 0:线程 0, Primary:主的, Stack:栈, Program Counter:程序计数器, PyThread State:PyThread 状态, Frame:不译, Exceptions:异常, Current:当前的, Next Thread:下个线程, Previous:之前的, Boot State:引导状态, Heap:堆, Core Instructions:核心指令, Modules:模块, Files:文件, Locks:锁, Sockets:套接字, Init:初始化

线程 bootstate 具有以下属性:

对于线程 bootstate,有两种 PyThread 的实现:

  1. 适用于 Linux 和 macOS 的 POSIX 线程

  2. 适用于 Windows 的 NT 线程

这两个实现都会创建操作系统线程,设置其属性,然后在新线程中执行回调函数 t_bootstrap()

此函数使用单个参数 boot_raw 调用,赋值给在 thread_PyThread_start_new_thread() 中构造的 bootstate

t_bootstrap() 函数是低层次线程和 Python 运行时之间的接口。引导程序将初始化线程,然后使用 PyObject_Call() 执行可调用的 target

执行可调用目标后,线程将退出:

图片内容: PyThread:不译, OS Specific Thread Start:操作系统特定线程启动, Inside Thread:线程内部, t_bootstrap:不译, PyObject_Call:不译, target:不译

POSIX 线程

名为 pthreads 的 POSIX 线程在 Python/thread_pthread.h 中有一个实现。此实现抽象了 <pthread.h> C API,并进行了一些额外的保护和优化。

线程可以配置栈的大小。 Python 有自己的栈框架结构,正如你在求值循环一章中所探讨的那样。如果存在导致递归循环的问题,并且帧执行达到深度限制,则 Python 将引发 RecursionError,你可以在 Python 代码中使用 try...except 块来处理它。

因为 pthreads 有自己的栈大小,Python 的最大深度和 pthread 的栈大小可能会冲突。如果线程栈大小小于 Python 中的最大帧深度,则整个 Python 进程将在引发 RecursionError 之前崩溃。

Python 中的最大深度可以在运行时使用 sys.setrecursionlimit() 进行配置。为避免崩溃,CPython pthread 实现将栈大小设置为解释器状态的 pythread_stacksize 值。

大多数现代 POSIX 兼容操作系统都支持 pthreads 的系统调度。如果在 pyconfig.h 中定义了 PTHREAD_SYSTEM_SCHED_SUPPORTED,则 pthread 将设置为 PTHREAD_SCOPE_SYSTEM,这意味着操作系统调度程序上线程的优先级取决于系统上的其他线程,而不仅仅是 Python 进程中的那些。

配置线程属性后,将使用 pthread_create() API 创建线程。这将从新线程内部运行引导函数。

最后,线程句柄 pthread_t 被转换为 unsigned long 并将返回值作为线程 ID。

Windows 线程

Python/thread_nt.h 中实现的 Windows 线程遵循类似但更简单的模式。

新线程的栈大小配置为解释器 pythread_stacksize 值(如果已设置)。然后使用 _beginthreadex() Windows API 创建线程,使用引导函数作为回调。最后返回线程 ID。

多线程总结

这不是关于 Python 线程的详尽教程。 Python 的线程实现非常广泛,并提供了许多机制来在线程之间共享数据、锁定对象和资源。

当 Python 应用程序受 I/O 限制时,线程是提高 Python 应用程序运行时间的一种非常有效的方法。在本节中,你了解了 GIL 是什么、它存在的原因以及标准库的哪些部分可能不受其约束。

Last updated