11.3 多进程并行

POSIX 系统为所有进程提供了一个 fork 子进程的 API。Fork 进程是对操作系统的低层次 API 调用,其可以由任何正在运行的进程执行。

进行此调用时,操作系统将克隆当前正在运行的进程的所有属性并创建一个新进程。这个克隆操作包括父进程的堆、寄存器和计数器位置。子进程可以在 fork 时从父进程读取任何变量。

在 POSIX 中 Fork 进程

例如,以“C 中的动态内存分配”开头使用的华氏度到摄氏度示例应用程序为例,你可以通过使用 fork() 把它调整为为每个华氏温度生成一个子进程,而不是按顺序计算它们。每个子进程将从该点继续运行:

cpython-book-samples/33/thread_celsius.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static const double five_ninths = 5.0/9.0;

double celsius(double fahrenheit){
    return (fahrenheit - 32) * five_ninths;
}

int main(int argc, char** argv) {
    if (argc != 2)
        return -1;
    int number = atoi(argv[1]);
    for (int i = 1 ; i <= number ; i++ ) {
        double f_value = 100 + (i*10);
        pid_t child = fork();
        if (child == 0) { // Is child process
            double c_value = celsius(f_value);
            printf("%f F is %f C (pid %d)\n", f_value, c_value, getpid());
            exit(0);
        }
    }
    printf("Spawned %d processes from %d\n", number, getpid());
    return 0;
}

在命令行上运行上面的程序会得到类似这样的输出:

$ ./thread_celsius 4
110.000000 F is 43.333333 C (pid 57179)
120.000000 F is 48.888889 C (pid 57180)
Spawned 4 processes from 57178
130.000000 F is 54.444444 C (pid 57181)
140.000000 F is 60.000000 C (pid 57182)

父进程(57178)产生了四个进程。对于每个子进程,程序在 child = fork() 行处继续执行,其 child 的结果值为 0。然后它完成计算、打印值并退出进程。最后,父进程输出它产生了多少个进程和它自己的 PID。

第三个和第四个子进程完成所用的时间比父进程完成所用的时间长。这就是为什么父进程在第三个和第四个打印自己的输出之前打印最终输出的原因。

父进程可以在子进程之前以自己的退出码退出。子进程由操作系统添加到进程组中,从而更容易控制所有相关进程:

图片内容: Process Group: 进程组, Process: 进程, Stack: 栈, Heap: 堆, Process Memory: 进程内存, Python Objects: Python 对象, Instructions:指令, CPython Runtime:CPython 运行时, Compiled Modules:编译模块, Files:文件, Locks:锁, Sockets:套接字 Parent Process: 父进程

这种并行方法最大的缺点是子进程是父进程的完整副本。

对于 CPython,这意味着将运行两个 CPython 解释器,且都必须加载模块和所有库。这会产生很大的开销。当正在完成的任务的任务量超过 fork 进程的开销时,使用多个进程是有意义的。

fork 进程的另一个主要缺点是它们有一个分离于父进程的独立堆。这意味着子进程不能写入父进程的内存空间。

创建子进程后,父进程的堆可供子进程使用。为了将信息发送回父进程,则必须使用某种形式的进程间通信 (IPC)。

os 模块提供了一个包装 fork() 的包装器。

Windows 中的多进程

到目前为止,你已经了解了 POSIX 模型。 Windows 没有提供 fork() 的等价品,而 Python 应该(尽可能)在 Linux、macOS 和 Windows 上具有相同的 API。

为了克服这个问题,CreateProcessW() API 用于生成另一个带有 -c 命令行参数的 python.exe 进程。此步骤称为生成进程,也可以在 POSIX 上使用。本章中你将看到对它的引用。

multiprocessing

CPython 在操作系统 fork 进程的 API 之上提供了一个 API,这使得在 Python 中创建多进程并行变得容易。

这个 API 可以从 multiprocessing 包中获得,它提供了池化进程、队列、fork、创建共享内存堆、将进程连接在一起等扩展功能。

相关源文件

以下是与 multiprocessing 相关的源文件:

文件用途

Lib/multiprocessing

multiprocessing 包的 Python 源码

Modules/_posixsubprocess.c

包装 POSIX fork() 系统调用的 C 扩展模块

Modules/_winapi.c

包装 Windows 内核 API 的 C 扩展模块

Modules/_multiprocessing

multiprocessing 包使用的 C 扩展模块

PC/msvcrtmodule.c

Microsoft Visual C 运行时库的 Python 接口

Spawn 和 Fork 进程

multiprocessing 包提供了三种启动新的并行进程的方法:

  1. fork 一个解释器(仅限 POSIX);

  2. spawn 一个新的解释器进程(POSIX 和 Windows);

  3. 运行一个 fork 服务器,在其中创建一个新进程,然后 fork 任意数量的进程(仅限 POSIX)。

对于 Windows 和 macOS,默认的启动方法是 spawn。对于 Linux,默认方法是 fork。你可以使用 multiprocessing.set_start_method() 覆盖默认方法。

用于启动新进程的 Python API 的入参包括可调用的“target”和参数元组“args”。

以生成一个新进程将华氏温度转换为摄氏温度为例:

cpython-book-samples/33/spawn_process_celsius.py

import multiprocessing as mp
import os

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = os.getpid()
    print(f"{f}F is {c}C (pid {pid})")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    p = mp.Process(target=to_celsius, args=(110,))
    p.start()

虽然你可以启动单个进程,但 multiprocessing API 假设你想要启动多个。有一些方便的方法可以生成多个进程并为它们提供数据集。其中一种方法是“Pool”类。

前面的示例可以扩展为在单独的 Python 解释器中计算一系列值:

cpython-book-samples/33/pool_process_celsius.py

import multiprocessing as mp
import os

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = os.getpid()
    print(f"{f}F is {c}C (pid {pid})")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    with mp.Pool(4) as pool:
        pool.map(to_celsius, range(110, 150, 10))

请注意,执行上面示例代码的输出显示了相同的 PID。因为 CPython 解释器进程有很大的开销,所以 Pool 会将池中的每个进程视为一个 worker。如果一个 worker 已经完成,它将被重用。

你可以通过替换此行来更改该设置:

    with mp.Pool(4) as pool:

将其替换为以下代码:

    with mp.Pool(4, maxtasksperchild=1) as pool:

现在前面的多进程示例将打印类似这样的内容:

$ python pool_process_celsius.py
110F is 43.333333333333336C (pid 5654)
120F is 48.88888888888889C (pid 5653)
130F is 54.44444444444445C (pid 5652)
140F is 60.0C (pid 5655)

输出显示了新生成进程的进程 ID 和计算的值。

创建子进程

这两个脚本都将创建一个新的 Python 解释器进程并使用 pickle 将数据传递给它。

参见

pickle 模块是一个用于序列化 Python 对象的序列化包。有关详细信息,请查看 Real Python 的“The Python pickle Module: How to Persist Objects in Python.”a

a https://realpython.com/python-pickle-module/

对于 POSIX 系统,multiprocessing 模块创建子进程等价于这个命令,其中<i> 是文件句柄描述符,<j> 是管道句柄描述符:

$ python -c 'from multiprocessing.spawn import spawn_main; \
  spawn_main(tracker_fd=<i>, pipe_handle=<j>)' --multiprocessing-fork

对于 Windows 系统,使用父进程的 PID 而不是此命令中的跟踪器文件描述符,其中<k>是父进程的 PID,<j>是管道句柄描述符:

> python.exe -c 'from multiprocessing.spawn import spawn_main; \
  spawn_main(parent_pid=<k>, pipe_handle=<j>)' --multiprocessing-fork

将数据通过管道传递给子进程

当新的子进程在操作系统上被实例化后,它会等待来自父进程的初始化数据。

父进程将两个对象写入管道文件流。管道文件流是一种特殊的 I/O 流,用于在命令行上的进程之间发送数据。

父进程写入的第一个对象是准备数据对象。该对象是一个字典,其中包含一些有关父进程的信息,如执行目录、启动方法、任何特殊的命令行参数和 sys.path

下面例子中,你能看到运行 multiprocessing.spawn.get_preparation_data(name) 会生成什么:

>>> import multiprocessing.spawn
>>> import pprint
>>> pprint.pprint(multiprocessing.spawn.get_preparation_data("example"))
{'authkey': b'\x90\xaa_\x22[\x18\ri\xbcag]\x93\xfe\xf5\xe5@[wJ\x99p#\x00'
            b'\xce\xd4)1j.\xc3c',
 'dir': '/Users/anthonyshaw',
 'log_to_stderr': False,
 'name': 'example',
 'orig_dir': '/Users/anthonyshaw',
 'start_method': 'spawn',
 'sys_argv': [''],
 'sys_path': [
    '/Users/anthonyshaw',
    ]}

第二个写入的对象是 BaseProcess 子类实例。根据调用 multiprocessing 的方式和使用的操作系统,将序列化“BaseProcess”的子类之一。

准备数据和进程对象都使用 pickle 模块序列化并写入父进程的管道流:

图片内容: Parent Process:父进程, Preparation Data:准备数据, Process Object:进程对象, pickle:模块名,不译, write():方法名,不译, Pipe:管道, Worker Pool:Worker 池, Waiting:等待中, Not Created:未创建

子进程生成和序列化过程的 POSIX 实现位于 Lib/multiprocessing/popen_spawn_posix.py 中。

Windows 实现位于 Lib/multiprocessing/popen_spawn_win32.py 中。

执行子进程

子进程的入口点 multiprocessing.spawn.spawn_main() 采用参数 pipe_handle,以及用于 Windows 的 parent_pid 或用于 POSIX 的 tracked_fd

def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
    '''
    Run code specified by data received over pipe
    '''
    assert is_forking(sys.argv), "Not forking"

对于 Windows,该函数将调用父进程 PID 的 OpenProcess API。这用于创建父进程管道的文件句柄 fd

    if sys.platform == 'win32':
        import msvcrt
        import _winapi
        if parent_pid is not None:
            source_process = _winapi.OpenProcess(
                _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
                False, parent_pid)
        else:
            source_process = None
        new_handle = reduction.duplicate(pipe_handle,
                                         source_process=source_process)
        fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
        parent_sentinel = source_process

对于 POSIX,pipe_handle 变成文件描述符 fd,并被复制为 parent_sentinel 的值:

    else:
        from . import resource_tracker
        resource_tracker._resource_tracker._fd = tracker_fd
        fd = pipe_handle
        parent_sentinel = os.dup(pipe_handle)

接下来,使用父管道文件句柄(fd)和父进程哨兵(parent_sentinel)调用 _main()_main() 的返回值成为进程的退出代码,解释器终止:

    exitcode = _main(fd, parent_sentinel)
    sys.exit(exitcode)

使用 fdparent_sentinel 调用 _main() 是为了检查父进程是否在执行子进程时退出。

_main() 反序列化 fd 字节流上的二进制数据。请记住,这是管道文件句柄。反序列化使用与父进程的相同的 pickle 库:

图片内容: Parent Process:父进程, Waiting:等待中, Pipe:管道, read():方法名,不译, pickle:模块名,不译, Preparation Data:准备数据, Process Object:进程对象, Worker Pool:Worker 池, Initializing:初始化中, Not Created:未创建

第一个值是包含准备数据的 dict。第二个值是 SpawnProcess 的实例,该实例之后用作调用 _bootstrap()

def _main(fd, parent_sentinel):
    with os.fdopen(fd, 'rb', closefd=True) as from_parent:
        process.current_process()._inheriting = True
        try:
            preparation_data = reduction.pickle.load(from_parent)
            prepare(preparation_data)
            self = reduction.pickle.load(from_parent)
        finally:
            del process.current_process()._inheriting
    return self._bootstrap(parent_sentinel)

_bootstrap() 处理来自反序列化数据的 BaseProcess 实例的实例化,然后使用参数和关键字参数调用目标函数。最后的任务由 BaseProcess.run() 完成:

    def run(self):
        '''
        Method to be run in subprocess; can be overridden in subclass
        '''
        if self._target:
            self._target(*self._args, **self._kwargs)

设置 self._bootstrap() 的退出码为 _main() 的退出码,子进程终止。

该进程允许父进程序列化模块和可执行函数。它还允许子进程反序列化该实例,执行带参数的函数,然后返回。

一旦子进程启动,它就不允许交换数据。此任务是使用 QueuePipe 对象的扩展来完成的。

如果在池中创建进程,则第一个进程将准备就绪并处于等待状态。父进程重复该过程并将数据发送给下一个worker:

图片内容: Parent Process:父进程, Preparation Data:准备数据, Process Object:进程对象, pickle:模块名,不译, write():方法名,不译, Pipe:管道, Worker Pool:Worker 池, Ready:准备就绪, Waiting:等待中

下一个 worker 接收数据,并初始化其状态并运行目标函数:

图片内容: Parent Process:父进程, Waiting:等待中, Pipe:管道, read():方法名,不译, pickle:模块名,不译, Preparation Data:准备数据, Process Object:进程对象, Worker Pool:Worker 池, Ready:准备就绪, Initializing:初始化中

要共享初始化之外的任何数据,必须使用队列和管道。

使用队列和管道交换数据

在上一节中,你看到了如何生成子进程,然后将管道用作序列化流来告诉子进程使用参数调用哪个函数。

根据任务的性质,进程之间有两种通信类型:队列管道。在了解它们两个之前,你将快速了解操作系统如何使用被称为 信号量 的变量来保护资源的访问。

信号量

多进程中的许多机制都使用信号量作为资源已锁定、正在等待或未使用的信号通知方式。操作系统使用二元信号量作为锁定文件、套接字等资源的简单变量类型。

如果一个进程正在写入文件或网络套接字,那么你不希望另一个进程突然开始写入同一个文件。这样数据会立即损坏。

相反,操作系统通过使用信号量来锁定资源。进程还可以发出信号,表示它们正在等待该锁被释放,这样当锁被释放时,进程会收到一条锁定资源已释放的消息,这样进程可以开始使用该资源。

在现实世界中,信号量是一种使用标志来传输消息的信号方法。因此,你可以想象资源等待、锁定和未使用状态的信号量信号如下所示:

图片内容: waiting:等待中, locked:被锁定的

信号量 API 因操作系统而异,因此有一个抽象类,multiprocessing.synchronize.Semaphore

CPython 将信号量用于多进程,因为它们既是线程安全的又是进程安全的。操作系统处理读取或写入同一信号量的任何潜在死锁。

这些信号量 API 函数的实现位于 C 扩展模块 Modules/_multiprocessing/semaphore.c 中。这个扩展模块提供了一个单一的方法来创建、锁定和释放信号量以及其他操作。

对操作系统的调用是通过一系列的宏进行的,这些宏根据不同的操作系统平台被编译成不同的实现。

对于 Windows,宏将 <winbase.h> 中的 API 函数用于信号量:

#define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
#define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
#define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
#define SEM_UNLINK(name) 0

对于 POSIX,宏将 <semaphore.h> 中的 API 函数用于信号量:

#define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600,...
#define SEM_CLOSE(sem) sem_close(sem)
#define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
#define SEM_UNLINK(name) sem_unlink(name)

队列

队列是在多个进程间发送、接收小数据的好方法。

你可以调整前面的多进程示例以使用 multiprocessing Manager() 实例并创建两个队列:

  1. 输入保存华氏输入值;

  2. 输出保存生成的摄氏值。

将池大小更改为 2 来获得两个 worker:

cpython-book-samples/33/pool_queue_celsius.py

import multiprocessing as mp

def to_celsius(input: mp.Queue, output: mp.Queue):
    f = input.get()
    # Time-consuming task ...
    c = (f - 32) * (5/9)
    output.put(c)

if __name__ == '__main__':
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(2) as pool:
        inputs = pool_manager.Queue()
        outputs = pool_manager.Queue()
        input_values = list(range(110, 150, 10))
        for i in input_values:
            inputs.put(i)
            pool.apply(to_celsius, (inputs, outputs))

        for f in input_values:
            print(outputs.get(block=False))

这会将返回的元组列表打印到输出队列:

$ python pool_queue_celsius.py
43.333333333333336
48.88888888888889
54.44444444444445
60.0

父进程首先将输入值放入输入队列。然后第一个 worker 从队列中取出一个元素。每次使用 .get() 从队列中取出一个元素时,都会在队列对象上使用信号量锁:

图片内容: Parent Process:父进程, Input Queue:输入队列, Output Queue:输出队列, get():函数名,不译, Worker Pool:Worker 池

当这个 worker 忙碌时,第二个 worker 会从队列中获取另一个值:

图片内容: Busy:忙碌, 其他文字为重复内容,见上一个图片

第一个 worker 已完成计算并将结果值放入输出队列:

图片内容: put():函数名,不译, 其他文字为重复内容,见上一个图片

两个队列用于分隔输入和输出值。最终,所有输入值都已处理,并且输出队列已满。然后父进程打印这些值:

图片内容: 文字和上一个图片重复,参考上一个图片

这个例子展示了一个池的 worker 如何接收一个存储小的、离散值的队列,并并行处理它们以将结果数据发送回主机进程。

在实践中,将摄氏度转换为华氏度是一个不适合并行执行的小而琐碎的计算。如果 worker 进程正在执行不同的 CPU 密集型计算,那么这将在多 CPU 或多核计算机上提供显著的性能改进。

对于流式数据而不是离散队列,你可以改用管道。

管道

multiprocessing 包中,有一个类型 Pipe。实例化一个管道返回两个连接,一个父连接和一个子连接。两者都可以发送和接收数据:

图片内容: Parent Process:父进程, send():函数名,不译, recv():函数名,不译, Parent Pipe:父管道, Child Pipe:子管道, Worker Pool:Worker 池, Busy:忙碌

在队列示例中,当发送和接收数据时,会在队列上隐式放置一个锁。管道没有这种行为,所以你必须小心,不要让两个进程尝试在同一时间写入同一个管道。

要使最后一个示例适用于管道,需要将 pool.apply() 更改为 pool.apply_async()。这会将下一个进程的执行更改为非阻塞操作:

cpython-book-samples/33/pool_pipe_celsius.py

import multiprocessing as mp

def to_celsius(child_pipe: mp.Pipe):
    f = child_pipe.recv()
    # time-consuming task ...
    c = (f - 32) * (5/9)
    child_pipe.send(c)

if __name__ == '__main__':
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(2) as pool:
        parent_pipe, child_pipe = mp.Pipe()
        results = []
        for input in range(110, 150, 10):
            parent_pipe.send(input)
            results.append(pool.apply_async(to_celsius, args=(child_pipe,)))
            print("Got {0:}".format(parent_pipe.recv()))
        parent_pipe.close()
        child_pipe.close()

在此行中,存在两个或多个进程尝试同时从父管道进行读取的风险:

    f = child_pipe.recv()

还有两个或多个进程试图同时写入子管道的风险:

    child_pipe.send(c)

如果发生这种情况,则数据将在接收或发送操作中损坏:

图片内容: Parent Process:父进程, send():函数名,不译, recv():函数名,不译, Parent Pipe:父管道, Child Pipe:子管道, Worker Pool:Worker 池, Busy:忙碌

为避免这种情况,你可以在操作系统上实现信号量锁。然后所有子进程将在读取或写入同一个管道之前检查锁。

需要两把锁,一把锁用在父管道的接收端,一把锁用在子管道的发送端:

cpython-book-samples/33/pool_pipe_locks_celsius.py

import multiprocessing as mp

def to_celsius(child_pipe: mp.Pipe, child_lock: mp.Lock):
    child_lock.acquire(blocking=False)
    try:
        f = child_pipe.recv()
    finally:
        child_lock.release()
    # time-consuming task ... release lock before processing
    c = (f - 32) * (5/9)
    # reacquire lock when done
    child_lock.acquire(blocking=False)
    try:
        child_pipe.send(c)
    finally:
        child_lock.release()

if __name__ == '__main__':
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(2) as pool:
        parent_pipe, child_pipe = mp.Pipe()
        child_lock = pool_manager.Lock()
        results = []
        for i in range(110, 150, 10):
            parent_pipe.send(i)
            results.append(pool.apply_async(
                to_celsius, args=(child_pipe, child_lock)))
            print(parent_pipe.recv())
        parent_pipe.close()
        child_pipe.close()

现在 worker 进程将在接收数据之前等待获取一个锁,并且在发送数据前等待获取另一把锁:

图片内容: Parent Process:父进程, Parent Pipe:父管道, Child Pipe:子管道, send():函数名,不译, Worker Pool:Worker 池, Waiting:等待中, Busy:忙碌

此示例适用于通过管道传输的数据很大的情况,因为冲突的可能性较高。

进程之间的共享状态

到目前为止,你已经了解了如何在子进程和父进程之间共享数据。

在某些情况下,你可能希望在子进程之间共享数据。针对这种情况,multiprocessing 包提供了两种解决方案:

  1. 一种是使用共享内存映射和共享 C 类型的高性能共享内存 API;

  2. 一种是通过 Manager 类支持复杂类型的灵活服务器进程 API。

示例应用程序

作为一个演示应用程序,在本章的其余部分,你将为不同的并发和并行技术重构 TCP 端口扫描器。

通过网络,可以在编号为 1 到 65535 的端口上联系主机。公共服务具有标准端口。例如,HTTP 在 80 端口上运行,HTTPS 在 443 端口上运行。TCP 端口扫描器是一种常见的网络测试工具,用于检查数据包是否可以通过网络发送。

此示例代码使用 Queue 接口,这是一种线程安全的队列实现,类似于你在多进程示例中使用的实现。该代码还使用 socket 包尝试连接到远程端口,其超时阈值为一秒钟。

check_port() 将查看 主机 是否响应给定的 端口。如果是,check_port() 则会将端口号添加到 results 队列中。

执行脚本时,将依次为端口号 80 到 100 调用 check_port()。完成后,清空 results 队列,并将结果打印在命令行上。因此你可以比较差异,它会在最后打印执行时间:

cpython-book-samples/33/portscanner.py

from queue import Queue
import socket
import time
timeout = 1.0

def check_port(host: str, port: int, results: Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

if __name__ == '__main__':
    start = time.time()
    host = "localhost" # Replace with a host you own
    results = Queue()
    for port in range(80, 100):
        check_port(host, port, results)
    while not results.empty():
        print("Port {0} is open".format(results.get()))
    print("Completed scan in {0} seconds".format(time.time() - start))

执行此示例代码将打印出开放的端口和花费的时间:

$ python portscanner.py
Port 80 is open
Completed scan in 19.623435020446777 seconds

你可以重构此示例以使用多进程。将 Queue 接口替换为 multiprocessing.Queue ,并使用池执行器一起扫描端口:

cpython-book-samples/33/portscanner_mp_queue.py

import multiprocessing as mp
import time
import socket

timeout = 1

def check_port(host: str, port: int, results: mp.Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

if __name__ == '__main__':
    start = time.time()
    processes = []
    scan_range = range(80, 100)
    host = "localhost" # Replace with a host you own
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    with mp.Pool(len(scan_range)) as pool:
        outputs = pool_manager.Queue()
        for port in scan_range:
            processes.append(pool.apply_async(check_port,
                                              (host, port, outputs)))
        for process in processes:
            process.get()
        while not outputs.empty():
            print("Port {0} is open".format(outputs.get()))
        print("Completed scan in {0} seconds".format(time.time() - start))

如你所料,此应用程序要快得多,因为它并行测试每个端口:

$ python portscanner_mp_queue.py
Port 80 is open
Completed scan in 1.556523084640503 seconds

多进程总结

多进程为 Python 提供了可扩展的并行执行 API。数据可以在进程之间共享,CPU 密集型工作可以分解为并行任务以利用多核或多 CPU 计算机。

当要完成的任务受 I/O 限制而不是 CPU 密集型时,多进程不是合适的解决方案。例如,如果你生成四个 worker 进程来读取和写入相同的文件,那么一个将完成所有工作,而其他三个将等待锁被释放。

多进程也不适合短期任务,因为启动新的 Python 解释器需要时间和处理开销。

在这两种情况下,你会发现接下来的方法之一更合适。

Last updated