multiprocessing
--- 基于进程的并行¶
概述¶
multiprocessing
是一个支持使用与 threading
模块类似的 API 来产生进程的包。 multiprocessing
包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。
multiprocessing
模块还引入了在 threading
模块中没有的API。一个主要的例子就是 Pool
对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
将在标准输出中打印
[1, 4, 9]
Process
类¶
在 multiprocessing
中,通过创建一个 Process
对象然后调用它的 start()
方法来生成进程。 Process
和 threading.Thread
API 相同。 一个简单的多进程程序示例是:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
要显示所涉及的各个进程ID,这是一个扩展示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
关于为什么 if __name__ == '__main__'
部分是必需的解释,请参见 编程指导。
上下文和启动方法¶
根据不同的平台, multiprocessing
支持三种启动进程的方法。这些 启动方法 有
- spawn
父进程会启动一个全新的 python 解释器进程。 子进程将只继承那些运行进程对象的
run()
方法所必需的资源。 特别地,来自父进程的非必需文件描述符和句柄将不会被继承。 使用此方法启动进程相比使用 fork 或 forkserver 要慢上许多。可在Unix和Windows上使用。 Windows上的默认设置。
- fork
父进程使用
os.fork()
来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。只存在于Unix。Unix中的默认值。
- forkserver
程序启动并选择 forkserver 启动方法时,将启动服务器进程。 从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。 分叉服务器进程是单线程的,因此使用
os.fork()
是安全的。 没有不必要的资源被继承。可在Unix平台上使用,支持通过Unix管道传递文件描述符。
在 3.8 版更改: 对于 macOS,spawn 启动方式是默认方式。 因为 fork 可能导致subprocess崩溃,被认为是不安全的,查看 bpo-33725 。
在 3.4 版更改: 在所有unix平台上添加支持了 spawn ,并且为一些unix平台添加了 forkserver 。在Windows上子进程不再继承所有可继承的父进程句柄。
在 Unix 上通过 spawn 和 forkserver 方式启动多进程会同时启动一个 资源追踪 进程,负责追踪当前程序的进程产生的、并且不再被使用的命名系统资源(如命名信号量以及 SharedMemory
对象)。当所有进程退出后,资源追踪会负责释放这些仍被追踪的的对象。通常情况下是不会有这种对象的,但是假如一个子进程被某个信号杀死,就可能存在这一类资源的“泄露”情况。(泄露的信号量以及共享内存不会被释放,直到下一次系统重启,对于这两类资源来说,这是一个比较大的问题,因为操作系统允许的命名信号量的数量是有限的,而共享内存也会占据主内存的一片空间)
要选择一个启动方法,你应该在主模块的 if __name__ == '__main__'
子句中调用 set_start_method()
。例如:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
在程序中 set_start_method()
不应该被多次调用。
或者,你可以使用 get_context()
来获取上下文对象。上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法。:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
请注意,对象在不同上下文创建的进程间可能并不兼容。 特别是,使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。
想要使用特定启动方法的库应该使用 get_context()
以避免干扰库用户的选择。
警告
'spawn'
和 'forkserver'
启动方法当前不能在Unix上和“冻结的”可执行内容一同使用(例如,有类似 PyInstaller 和 cx_Freeze 的包产生的二进制文件)。 'fork'
启动方法可以使用。
在进程之间交换对象¶
multiprocessing
支持进程之间的两种通信通道:
队列
Queue
类是一个近似queue.Queue
的克隆。 例如:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()队列是线程和进程安全的。
管道
Pipe()
函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()返回的两个连接对象
Pipe()
表示管道的两端。每个连接对象都有send()
和recv()
方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。
进程间同步¶
multiprocessing
包含来自 threading
的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
不使用锁的情况下,来自于多进程的输出很容易产生混淆。
使用工作进程¶
Pool
类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。
例如
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,进程池的方法只能由创建它的进程使用。
注解
这个包中的功能要求子进程可以导入 __main__
模块。虽然这在 编程指导 中有描述,但还是需要提前说明一下。这意味着一些示例在交互式解释器中不起作用,比如 multiprocessing.pool.Pool
示例。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果尝试执行上面的代码,它会以一种半随机的方式将三个完整的堆栈内容交替输出,然后你只能以某种方式停止父进程。)
参考¶
multiprocessing
包主要复制了 threading
模块的API。
Process
和异常¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ 进程对象表示在单独进程中运行的活动。
Process
类拥有和threading.Thread
等价的大部分方法。应始终使用关键字参数调用构造函数。 group 应该始终是
None
;它仅用于兼容threading.Thread
。 target 是由run()
方法调用的可调用对象。它默认为None
,意味着什么都没有被调用。 name 是进程名称(有关详细信息,请参阅name
)。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。如果提供,则键参数 daemon 将进程daemon
标志设置为True
或False
。如果是None
(默认值),则该标志将从创建的进程继承。默认情况下,不会将任何参数传递给 target 。
如果子类重写构造函数,它必须确保它在对进程执行任何其他操作之前调用基类构造函数(
Process.__init__()
)。在 3.3 版更改: 加入 daemon 参数。
-
run
()¶ 表示进程活动的方法。
你可以在子类中重载此方法。标准
run()
方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 args 和 kwargs 参数中获取顺序和关键字参数。
-
join
([timeout])¶ 如果可选参数 timeout 是
None
(默认值),则该方法将阻塞,直到调用join()
方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回None
。检查进程的exitcode
以确定它是否终止。一个进程可以被 join 多次。
进程无法join自身,因为这会导致死锁。尝试在启动进程之前join进程是错误的。
-
name
¶ 进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。
初始名称由构造器设定。 如果没有为构造器提供显式名称,则会构造一个形式为 'Process-N1:N2:...:Nk' 的名称,其中每个 Nk 是其父亲的第 N 个孩子。
-
daemon
¶ 进程的守护标志,一个布尔值。这必须在
start()
被调用之前设置。初始值继承自创建进程。
当进程退出时,它会尝试终止其所有守护进程子进程。
请注意,不允许在守护进程中创建子进程。这是因为当守护进程由于父进程退出而中断时,其子进程会变成孤儿进程。 另外,这些 不是 Unix 守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。
除了
threading.Thread
API ,Process
对象还支持以下属性和方法:-
pid
¶ 返回进程ID。在生成该进程之前,这将是
None
。
-
exitcode
¶ 子进程的退出代码。如果进程尚未终止,这将是
None
。负值 -N 表示子进程被信号 N 终止。
-
authkey
¶ 进程的身份验证密钥(字节字符串)。
当
multiprocessing
初始化时,主进程使用os.urandom()
分配一个随机字符串。当创建
Process
对象时,它将继承其父进程的身份验证密钥,尽管可以通过将authkey
设置为另一个字节字符串来更改。参见 认证密码 。
-
sentinel
¶ 系统对象的数字句柄,当进程结束时将变为 "ready" 。
如果要使用
multiprocessing.connection.wait()
一次等待多个事件,可以使用此值。否则调用join()
更简单。在Windows上,这是一个操作系统句柄,可以与
WaitForSingleObject
和WaitForMultipleObjects
系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自select
模块的原语。3.3 新版功能.
-
terminate
()¶ 终止进程。 在Unix上,这是使用
SIGTERM
信号完成的;在Windows上使用TerminateProcess()
。 请注意,不会执行退出处理程序和finally子句等。请注意,进程的后代进程将不会被终止 —— 它们将简单地变成孤立的。
警告
如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。
-
kill
()¶ 与
terminate()
相同,但在Unix上使用SIGKILL
信号。3.7 新版功能.
-
close
()¶ 关闭
Process
对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发ValueError
。一旦close()
成功返回,Process
对象的大多数其他方法和属性将引发ValueError
。3.7 新版功能.
注意
start()
、join()
、is_alive()
、terminate()
和exitcode
方法只能由创建进程对象的进程调用。Process
一些方法的示例用法:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ 所有
multiprocessing
异常的基类。
-
exception
multiprocessing.
BufferTooShort
¶ 当提供的缓冲区对象太小而无法读取消息时,
Connection.recv_bytes_into()
引发的异常。如果
e
是一个BufferTooShort
实例,那么e.args[0]
将把消息作为字节字符串给出。
-
exception
multiprocessing.
AuthenticationError
¶ 出现身份验证错误时引发。
-
exception
multiprocessing.
TimeoutError
¶ 有超时的方法超时时引发。
管道和队列¶
使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用同步原语,例如锁。
消息机制包含: Pipe()
(可以用于在两个进程间传递消息),以及队列(能够在多个生产者和消费者之间通信)。
Queue
, SimpleQueue
以及 JoinableQueue
都是多生产者,多消费者,并且实现了 FIFO 的队列类型,其表现与标准库中的 queue.Queue
类相似。 不同之处在于 Queue
缺少标准库的 queue.Queue
从 Python 2.5 开始引入的 task_done()
和 join()
方法。
如果你使用了 JoinableQueue
,那么你 必须 对每个已经移出队列的任务调用 JoinableQueue.task_done()
。 不然的话用于统计未完成任务的信号量最终会溢出并抛出异常。
另外还可以通过使用一个管理器对象创建一个共享队列,详见 管理器 。
注解
multiprocessing
使用了普通的 queue.Empty
和 queue.Full
异常去表示超时。 你需要从 queue
中导入它们,因为它们并不在 multiprocessing
的命名空间中。
注解
当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用 pickle 序列化,并将序列化后的数据通过一个底层管道的管道传递到队列中。 这种做法会有点让人惊讶,但一般不会出现什么问题。 如果它们确实妨碍了你,你可以使用一个由管理器 manager 创建的队列替换它。
将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法
empty()
才会返回False
。而get_nowait()
可以不抛出queue.Empty
直接返回。如果有多个进程同时将对象放入队列,那么在队列的另一端接受到的对象可能是无序的。但是由同一个进程放入的多个对象的顺序在另一端输出时总是一样的。
警告
如果一个进程在尝试使用 Queue
期间被 Process.terminate()
或 os.kill()
调用终止了,那么队列中的数据很可能被破坏。 这可能导致其他进程在尝试使用该队列时发生异常。
警告
正如刚才提到的,如果一个子进程将一些对象放进队列中 (并且它没有用 JoinableQueue.cancel_join_thread
方法),那么这个进程在所有缓冲区的对象被刷新进管道之前,是不会终止的。
这意味着,除非你确定所有放入队列中的对象都已经被消费了,否则如果你试图等待这个进程,你可能会陷入死锁中。相似地,如果该子进程不是后台进程,那么父进程可能在试图等待所有非后台进程退出时挂起。
注意用管理器创建的队列不存在这个问题,详见 编程指导 。
该 例子 展示了如何使用队列实现进程间通信。
-
multiprocessing.
Pipe
([duplex])¶ 返回一对
Connection`对象 ``(conn1, conn2)`
, 分别表示管道的两端。如果 duplex 被置为
True
(默认值),那么该管道是双向的。如果 duplex 被置为False
,那么该管道是单向的,即conn1
只能用于接收消息,而conn2
仅能用于发送消息。
-
class
multiprocessing.
Queue
([maxsize])¶ 返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。
一旦超时,将抛出标准库
queue
模块中常见的异常queue.Empty
和queue.Full
。除了
task_done()
和join()
之外,Queue
实现了标准库类queue.Queue
中所有的方法。-
qsize
()¶ 返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。
注意,在 Unix 平台上,例如 Mac OS X ,这个方法可能会抛出
NotImplementedError
异常,因为该平台没有实现sem_getvalue()
。
-
empty
()¶ 如果队列是空的,返回
True
,反之返回False
。 由于多线程或多进程的环境,该状态是不可靠的。
-
full
()¶ 如果队列是满的,返回
True
,反之返回False
。 由于多线程或多进程的环境,该状态是不可靠的。
-
put
(obj[, block[, timeout]])¶ 将 obj 放入队列。如果可选参数 block 是
True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出queue.Full
异常。反之 (block 是False
时),仅当有可用缓冲槽时才放入对象,否则抛出queue.Full
异常 (在这种情形下 timeout 参数会被忽略)。在 3.8 版更改: 如果队列已经关闭,会抛出
ValueError
而不是AssertionError
。
-
put_nowait
(obj)¶ 相当于
put(obj, False)
。
-
get
([block[, timeout]])¶ 从队列中取出并返回对象。如果可选参数 block 是
True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出queue.Empty
异常。反之 (block 是False
时),仅当有可用对象能够取出时返回,否则抛出queue.Empty
异常 (在这种情形下 timeout 参数会被忽略)。在 3.8 版更改: 如果队列已经关闭,会抛出
ValueError
而不是OSError
。
-
get_nowait
()¶ 相当于
get(False)
。
multiprocessing.Queue
类有一些在queue.Queue
类中没有出现的方法。这些方法在大多数情形下并不是必须的。-
close
()¶ 指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用。
-
join_thread
()¶ 等待后台线程。这个方法仅在调用了
close()
方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中。默认情况下,如果一个不是队列创建者的进程试图退出,它会尝试等待这个队列的后台线程。这个进程可以使用
cancel_join_thread()
让join_thread()
方法什么都不做直接跳过。
-
cancel_join_thread
()¶ 防止
join_thread()
方法阻塞当前进程。具体而言,这防止进程退出时自动等待后台线程退出。详见join_thread()
。可能这个方法称为”
allow_exit_without_flush()
“ 会更好。这有可能会导致正在排队进入队列的数据丢失,大多数情况下你不需要用到这个方法,仅当你不关心底层管道中可能丢失的数据,只是希望进程能够马上退出时使用。
注解
该类的功能依赖于宿主操作系统具有可用的共享信号量实现。否则该类将被禁用,任何试图实例化一个
Queue
对象的操作都会抛出ImportError
异常,更多信息详见 bpo-3770 。后续说明的任何专用队列对象亦如此。-
-
class
multiprocessing.
SimpleQueue
¶ 这是一个简化的
Queue
类的实现,很像带锁的Pipe
。-
empty
()¶ 如果队列为空返回
True
,否则返回False
。
-
get
()¶ 从队列中移出并返回一个对象。
-
put
(item)¶ 将 item 放入队列。
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
类是Queue
的子类,额外添加了task_done()
和join()
方法。-
task_done
()¶ 指出之前进入队列的任务已经完成。由队列的消费者进程使用。对于每次调用
get()
获取的任务,执行完成后调用task_done()
告诉队列该任务已经处理完成。如果
join()
方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用put()
放进队列中的所有对象都已经返回了对应的task_done()
) 。如果被调用的次数多于放入队列中的项目数量,将引发
ValueError
异常 。
-
join
()¶ 阻塞至队列中所有的元素都被接收和处理完毕。
当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用
task_done()
表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候,join()
阻塞被解除。
-
杂项¶
-
multiprocessing.
active_children
()¶ 返回当前进程存活的子进程的列表。
调用该方法有“等待”已经结束的进程的副作用。
-
multiprocessing.
cpu_count
()¶ 返回系统的CPU数量。
该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由
len(os.sched_getaffinity(0))
方法获得。可能引发
NotImplementedError
。
-
multiprocessing.
parent_process
()¶ 返回父进程
Process
对象,和父进程调用current_process()
返回的对象一样。如果一个进程已经是主进程,parent_process
会返回None
.3.8 新版功能.
-
multiprocessing.
freeze_support
()¶ 为使用了
multiprocessing
的程序,提供冻结以产生 Windows 可执行文件的支持。(在 py2exe, PyInstaller 和 cx_Freeze 上测试通过)需要在 main 模块的
if __name__ == '__main__'
该行之后马上调用该函数。例如:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果没有调用
freeze_support()
在尝试运行被冻结的可执行文件时会抛出RuntimeError
异常。对
freeze_support()
的调用在非 Windows 平台上是无效的。如果该模块在 Windows 平台的 Python 解释器中正常运行 (该程序没有被冻结), 调用``freeze_support()`` 也是无效的。
-
multiprocessing.
get_all_start_methods
()¶ 返回支持的启动方法的列表,该列表的首项即为默认选项。可能的启动方法有
'fork'
,'spawn'
和``'forkserver'。在 Windows 中,只有 ``'spawn'
是可用的。Unix平台总是支持``'fork'`` 和``'spawn',且 ``'fork'
是默认值。3.4 新版功能.
-
multiprocessing.
get_context
(method=None)¶ 返回一个 Context 对象。该对象具有和
multiprocessing
模块相同的API。如果 method 设置成
None
那么将返回默认上下文对象。否则 method 应该是'fork'
,'spawn'
,'forkserver'
。 如果指定的启动方法不存在,将抛出ValueError
异常。3.4 新版功能.
-
multiprocessing.
get_start_method
(allow_none=False)¶ 返回启动进程时使用的启动方法名。
如果启动方法已经固定,并且 allow_none 被设置成 False ,那么启动方法将被固定为默认的启动方法,并且返回其方法名。如果启动方法没有设定,并且 allow_none 被设置成 True ,那么将返回
None
。The return value can be
'fork'
,'spawn'
,'forkserver'
orNone
.'fork'
is the default on Unix, while'spawn'
is the default on Windows and macOS.
在 3.8 版更改: 对于 macOS,spawn 启动方式是默认方式。 因为 fork 可能导致subprocess崩溃,被认为是不安全的,查看 bpo-33725 。
3.4 新版功能.
-
multiprocessing.
set_executable
()¶ 设置在启动子进程时使用的 Python 解释器路径。 ( 默认使用
sys.executable
) 嵌入式编程人员可能需要这样做:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
以使他们可以创建子进程。
在 3.4 版更改: 现在在 Unix 平台上使用
'spawn'
启动方法时支持调用该方法。
-
multiprocessing.
set_start_method
(method)¶ 设置启动子进程的方法。 method 可以是
'fork'
,'spawn'
或者'forkserver'
。注意这最多只能调用一次,并且需要藏在 main 模块中,由
if __name__ == '__main__'
保护着。3.4 新版功能.
注解
multiprocessing
并没有包含类似 threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, 或者 threading.local
的方法和类。
连接对象(Connection)¶
Connection 对象允许收发可以序列化的对象或字符串。它们可以看作面向消息的连接套接字。
通常使用 Pipe
创建 Connection 对象。详见 : 监听器及客户端.
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ 将一个对象发送到连接的另一端,可以用
recv()
读取。发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发
ValueError
异常。
-
fileno
()¶ 返回由连接对象使用的描述符或者句柄。
-
close
()¶ 关闭连接对象。
当连接对象被垃圾回收时会自动调用。
-
poll
([timeout])¶ 返回连接对象中是否有可以读取的数据。
如果未指定 timeout ,此方法会马上返回。如果 timeout 是一个数字,则指定了最大阻塞的秒数。如果 timeout 是
None
,那么将一直等待,不会超时。注意通过使用
multiprocessing.connection.wait()
可以一次轮询多个连接对象。
-
send_bytes
(buffer[, offset[, size]])¶ 从一个 bytes-like object 对象中取出字节数组并作为一条完整消息发送。
如果由 offset 给定了在 buffer 中读取数据的位置。 如果给定了 size ,那么将会从缓冲区中读取多个字节。 过大的缓冲区 ( 接近 32MiB+ ,此值依赖于操作系统 ) 有可能引发
ValueError
异常。
-
recv_bytes
([maxlength])¶ 以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出
EOFError
异常。如果给定了 maxlength 并且消息长于 maxlength 那么将抛出
OSError
并且该连接对象将不再可读。
-
recv_bytes_into
(buffer[, offset])¶ 将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出
EOFError
异常。buffer must be a writable bytes-like object. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).
如果缓冲区太小,则将引发
BufferTooShort
异常,并且完整的消息将会存放在异常实例e
的e.args[0]
中。
在 3.3 版更改: 现在连接对象自身可以通过
Connection.send()
和Connection.recv()
在进程之间传递。3.3 新版功能: 连接对象现已支持上下文管理协议 -- 参见 see 上下文管理器类型 。
__enter__()
返回连接对象,__exit__()
会调用close()
。-
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法会自动解封它收到的数据,除非你能够信任发送消息的进程,否则此处可能有安全风险。
因此, 除非连接对象是由 Pipe()
产生的,否则你应该仅在使用了某种认证手段之后才使用 recv()
和 send()
方法。 参考 认证密码。
警告
如果一个进程在试图读写管道时被终止了,那么管道中的数据很可能是不完整的,因为此时可能无法确定消息的边界。
同步原语¶
通常来说同步原语在多进程环境中并不像它们在多线程环境中那么必要。参考 threading
模块的文档。
注意可以使用管理器对象创建同步原语,参考 管理器 。
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ 类似
threading.Barrier
的栅栏对象。3.3 新版功能.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ 非常类似
threading.BoundedSemaphore
的有界信号量对象。一个小小的不同在于,它的
acquire
方法的第一个参数名是和Lock.acquire()
一样的 block 。注解
在 Mac OS X 平台上, 该对象于
Semaphore
不同在于sem_getvalue()
方法并没有在该平台上实现。
-
class
multiprocessing.
Condition
([lock])¶ 条件变量:
threading.Condition
的别名。指定的 lock 参数应该是
multiprocessing
模块中的Lock
或者RLock
对象。在 3.3 版更改: 新增了
wait_for()
方法。
-
class
multiprocessing.
Event
¶ A clone of
threading.Event
.
-
class
multiprocessing.
Lock
¶ 原始锁(非递归锁)对象,类似于
threading.Lock
。一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。任何进程或线程都可以释放锁。除非另有说明,否则multiprocessing.Lock
用于进程或者线程的概念和行为都和threading.Lock
一致。注意
Lock
实际上是一个工厂函数。它返回由默认上下文初始化的multiprocessing.synchronize.Lock
对象。Lock
supports the context manager protocol and thus may be used inwith
statements.-
acquire
(block=True, timeout=None)¶ 可以阻塞或非阻塞地获得锁。
如果 block 参数被设为
True
( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回True
。需要注意的是第一个参数名与threading.Lock.acquire()
的不同。如果 block 参数被设置成
False
,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回False
; 否则将锁设置成锁住状态,并返回True
。当 timeout 是一个正浮点数时,会在等待锁的过程中最多阻塞等待 timeout 秒,当 timeout 是负数时,效果和 timeout 为0时一样,当 timeout 是
None
(默认值)时,等待时间是无限长。需要注意的是,对于 timeout 参数是负数和None
的情况, 其行为与threading.Lock.acquire()
是不一样的。当 block 参数 为False
时, timeout 并没有实际用处,会直接忽略。否则,函数会在拿到锁后返回True
或者 超时没拿到锁后返回False
。
-
release
()¶ 释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。
当尝试释放一个没有被持有的锁时,会抛出
ValueError
异常,除此之外其行为与threading.Lock.release()
一样。
-
-
class
multiprocessing.
RLock
¶ 递归锁对象: 类似于
threading.RLock
。递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。注意
RLock
是一个工厂函数,调用后返回一个使用默认 context 初始化的multiprocessing.synchronize.RLock
实例。RLock
支持 context manager 协议,因此可在with
语句内使用。-
acquire
(block=True, timeout=None)¶ 可以阻塞或非阻塞地获得锁。
当 block 参数设置为
True
时,会一直阻塞直到锁处于空闲状态(没有被任何进程、线程拥有),除非当前进程或线程已经拥有了这把锁。然后当前进程/线程会持有这把锁(在锁没有其他持有者的情况下),锁内的递归等级加一,并返回True
. 注意, 这个函数第一个参数的行为和threading.RLock.acquire()
的实现有几个不同点,包括参数名本身。当 block 参数是
False
, 将不会阻塞,如果此时锁被其他进程或者线程持有,当前进程、线程获取锁操作失败,锁的递归等级也不会改变,函数返回False
, 如果当前锁已经处于释放状态,则当前进程、线程则会拿到锁,并且锁内的递归等级加一,函数返回True
。timeout 参数的使用方法及行为与
Lock.acquire()
一样。但是要注意 timeout 的其中一些行为和threading.RLock.acquire()
中实现的行为是不同的。
-
release
()¶ 释放锁,使锁内的递归等级减一。如果释放后锁内的递归等级降低为0,则会重置锁的状态为释放状态(即没有被任何进程、线程持有),重置后如果有有其他进程和线程在等待这把锁,他们中的一个会获得这个锁而继续运行。如果释放后锁内的递归等级还没到达0,则这个锁仍将保持未释放状态且当前进程和线程仍然是持有者。
只有当前进程或线程是锁的持有者时,才允许调用这个方法。如果当前进程或线程不是这个锁的拥有者,或者这个锁处于已释放的状态(即没有任何拥有者),调用这个方法会抛出
AssertionError
异常。注意这里抛出的异常类型和threading.RLock.release()
中实现的行为不一样。
-
-
class
multiprocessing.
Semaphore
([value])¶ 一种信号量对象: 类似于
threading.Semaphore
.一个小小的不同在于,它的
acquire
方法的第一个参数名是和Lock.acquire()
一样的 block 。
注解
在 Mac OS X 上,不支持 sem_timedwait
,所以,调用 acquire()
时如果使用 timeout 参数,会通过循环sleep来模拟这个函数的行为。
注解
假如信号 SIGINT 是来自于 Ctrl-C ,并且主线程被 BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
或 Condition.wait()
阻塞,则调用会立即中断同时抛出 KeyboardInterrupt
异常。
这和 threading
的行为不同,此模块中当执行对应的阻塞式调用时,SIGINT 会被忽略。
注解
这个包的某些功能依赖于宿主机系统的共享信号量的实现,如果系统没有这个特性, multiprocessing.synchronize
会被禁用,尝试导入这个模块会引发 ImportError
异常,详细信息请查看 bpo-3770 。
管理器¶
管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。
返回一个已启动的
SyncManager
管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已经启动的子进程,并且拥有一系列方法可以用于创建共享对象、返回对应的代理。
当管理器被垃圾回收或者父进程退出时,管理器进程会立即退出。管理器类定义在 multiprocessing.managers
模块:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ 创建一个 BaseManager 对象。
一旦创建,应该及时调用
start()
或者get_server().serve_forever()
以确保管理器对象对应的管理进程已经启动。address 是管理器服务进程监听的地址。如果 address 是
None
,则允许和任意主机的请求建立连接。authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是
None
, 则会使用current_process().authkey
, 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。-
start
([initializer[, initargs]])¶ 为管理器开启一个子进程,如果 initializer 不是
None
, 子进程在启动时将会调用initializer(*initargs)
。
-
get_server
()¶ 返回一个
Server
对象,它是管理器在后台控制的真实的服务。Server
对象拥有serve_forever()
方法。>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
额外拥有一个address
属性。
-
connect
()¶ 将本地管理器对象连接到一个远程管理器进程:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ 一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。
typeid 是一种 "类型标识符",用于唯一表示某种共享对象类型,必须是一个字符串。
callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用
connect()
方法连接到服务器,或者 create_method 参数为False
,那么这里可留下None
。proxytype 是
BaseProxy
的子类,可以根据 typeid 为共享对象创建一个代理,如果是None
, 则会自动创建一个代理类。exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用
BaseProxy._callmethod()
代理。(如果 exposed 是None
, 则会在proxytype._exposed_
存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有__call__()
方法并且不是以'_'
开头的属性)method_to_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeid 是
None
, 则proxytype._method_to_typeid_
会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是None
,则方法返回的对象会是一个值拷贝。create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为
True
。
BaseManager
实例也有一个只读属性。-
address
¶ 管理器所用的地址。
在 3.3 版更改: 管理器对象支持上下文管理协议 - 查看 上下文管理器类型 。
__enter__()
启动服务进程(如果它还没有启动)并且返回管理器对象,__exit__()
会调用shutdown()
。在之前的版本中,如果管理器服务进程没有启动,
__enter__()
不会负责启动它。-
-
class
multiprocessing.managers.
SyncManager
¶ BaseManager
的子类,可用于进程的同步。这个类型的对象使用multiprocessing.Manager()
创建。它拥有一系列方法,可以为大部分常用数据类型创建并返回 代理对象 代理,用于进程间同步。甚至包括共享列表和字典。
-
Barrier
(parties[, action[, timeout]])¶ 创建一个共享的
threading.Barrier
对象并返回它的代理。3.3 新版功能.
-
BoundedSemaphore
([value])¶ 创建一个共享的
threading.BoundedSemaphore
对象并返回它的代理。
-
Condition
([lock])¶ 创建一个共享的
threading.Condition
对象并返回它的代理。如果提供了 lock 参数,那它必须是
threading.Lock
或threading.RLock
的代理对象。在 3.3 版更改: 新增了
wait_for()
方法。
-
Event
()¶ 创建一个共享的
threading.Event
对象并返回它的代理。
-
Lock
()¶ 创建一个共享的
threading.Lock
对象并返回它的代理。
-
Queue
([maxsize])¶ 创建一个共享的
queue.Queue
对象并返回它的代理。
-
RLock
()¶ 创建一个共享的
threading.RLock
对象并返回它的代理。
-
Semaphore
([value])¶ 创建一个共享的
threading.Semaphore
对象并返回它的代理。
-
Array
(typecode, sequence)¶ 创建一个数组并返回它的代理。
-
Value
(typecode, value)¶ 创建一个具有可写
value
属性的对象并返回它的代理。
在 3.6 版更改: 共享对象能够嵌套。例如, 共享的容器对象如共享列表,可以包含另一个共享对象,他们全都会在
SyncManager
中进行管理和同步。-
-
class
multiprocessing.managers.
Namespace
¶ 一个可以注册到
SyncManager
的类型。命名空间对象没有公共方法,但是拥有可写的属性。直接print会显示所有属性的值。
值得一提的是,当对命名空间对象使用代理的时候,访问所有名称以
'_'
开头的属性都只是代理器上的属性,而不是命名空间对象的属性。>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
自定义管理器¶
要创建一个自定义的管理器,需要新建一个 BaseManager
的子类,然后使用这个管理器类上的 register()
类方法将新类型或者可调用方法注册上去。例如:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用远程管理器¶
可以将管理器服务运行在一台机器上,然后使用客户端从其他机器上访问。(假设它们的防火墙允许)
运行下面的代码可以启动一个服务,此付包含了一个共享队列,允许远程客户端访问:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
远程客户端可以通过下面的方式访问服务:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
也可以通过下面的方式:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以访问这个队列,利用上面的客户端代码通过远程方式访问:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理对象¶
代理是一个 指向 其他共享对象的对象,这个对象(很可能)在另外一个进程中。共享对象也可以说是代理 指涉 的对象。多个代理对象可能指向同一个指涉对象。
代理对象代理了指涉对象的一系列方法调用(虽然并不是指涉对象的每个方法都有必要被代理)。通过这种方式,代理的使用方法可以和它的指涉对象一样:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,对代理使用 str()
函数会返回指涉对象的字符串表示,但是 repr()
却会返回代理本身的内部字符串表示。
被代理的对象很重要的一点是必须可以被序列化,这样才能允许他们在进程间传递。因此,指涉对象可以包含 代理对象 。这允许管理器中列表、字典或者其他 代理对象 对象之间的嵌套。
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
类似地,字典和列表代理也可以相互嵌套:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果指涉对象包含了普通 list
或 dict
对象,对这些内部可变对象的修改不会通过管理器传播,因为代理无法得知被包含的值什么时候被修改了。但是把存放在容器代理中的值本身是会通过管理器传播的(会触发代理对象中的 __setitem__
)从而有效修改这些对象,所以可以把修改过的值重新赋值给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
在大多是使用情形下,这种实现方式并不比嵌套 代理对象 方便,但是依然演示了对于同步的一种控制级别。
注解
multiprocessing
中的代理类并没有提供任何对于代理值比较的支持。所以,我们会得到如下结果:
>>> manager.list([1,2,3]) == [1,2,3]
False
当需要比较值的时候,应该替换为使用指涉对象的拷贝。
-
class
multiprocessing.managers.
BaseProxy
¶ 代理对象是
BaseProxy
派生类的实例。-
_callmethod
(methodname[, args[, kwds]])¶ 调用指涉对象的方法并返回结果。
如果
proxy
是一个代理且其指涉的是obj
, 那么下面的表达式:proxy._callmethod(methodname, args, kwds)
相当于求取以下表达式的值:
getattr(obj, methodname)(*args, **kwds)
于管理器进程。
返回结果会是一个值拷贝或者一个新的共享对象的代理 - 见函数
BaseManager.register()
中关于参数 method_to_typeid 的文档。如果这个调用熬出了异常,则这个异常会被
_callmethod()
透传出来。如果是管理器进程本身抛出的一些其他异常,则会被_callmethod()
转换为RemoteError
异常重新抛出。特别注意,如果 methodname 没有 暴露 出来,将会引发一个异常。
_callmethod()
的一个使用示例:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ 返回指涉对象的一份拷贝。
如果指涉对象无法序列化,则会抛出一个异常。
-
__repr__
()¶ 返回代理对象的内部字符串表示。
-
__str__
()¶ 返回指涉对象的内部字符串表示。
-
清理¶
代理对象使用了一个弱引用回调函数,当它被垃圾回收时,会将自己从拥有此指涉对象的管理器上反注册,
当共享对象没有被任何代理器引用时,会被管理器进程删除。
进程池¶
可以创建一个进程池,它将使用 Pool
类执行提交给它的任务。
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ 一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。
processes 是要使用的工作进程数目。如果 processes 为
None
,则使用os.cpu_count()
返回的值。如果 initializer 不为
None
,则每个工作进程将会在启动时调用initializer(*initargs)
。maxtasksperchild 是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。默认的 maxtasksperchild 是
None
,意味着工作进程寿与池齐。context 可被用于指定启动的工作进程的上下文。通常一个进程池是使用函数
multiprocessing.Pool()
或者一个上下文对象的Pool()
方法创建的。在这两种情况下, context 都是适当设置的。注意,进程池对象的方法只有创建它的进程能够调用。
警告
multiprocessing.pool
对象具有需要正确管理的内部资源 (像任何其他资源一样),具体方式是将进程池用作上下文管理器,或者手动调用close()
和terminate()
。 未做此类操作将导致进程在终结阶段挂起。请注意依赖垃圾回收器来销毁进程池是 不正确 的做法,因为 CPython 并不保证会调用进程池终结程序(请参阅
object.__del__()
来了解详情)。3.2 新版功能: maxtasksperchild
3.4 新版功能: context
注解
通常来说,
Pool
中的 Worker 进程的生命周期和进程池的工作队列一样长。一些其他系统中(如 Apache, mod_wsgi 等)也可以发现另一种模式,他们会让工作进程在完成一些任务后退出,清理、释放资源,然后启动一个新的进程代替旧的工作进程。Pool
的 maxtasksperchild 参数给用户提供了这种能力。-
apply
(func[, args[, kwds]])¶ 使用 args 参数以及 kwds 命名参数调用 func , 它会返回结果前阻塞。这种情况下,
apply_async()
更适合并行化工作。另外 func 只会在一个进程池中的一个工作进程中执行。
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ apply()
方法的一个变种,返回一个AsyncResult
对象。如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。
如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。
回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。
-
map
(func, iterable[, chunksize])¶ 内置
map()
函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅starmap()
)。 它会保持阻塞直到获得结果。这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小可以。
注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用
imap()
或imap_unordered()
并且显示指定 chunksize 以提升效率。
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ map()
方法的一个变种,返回一个AsyncResult
对象。如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。
如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。
回调函数应该立即执行完成,否则会阻塞负责处理结果的线程。
-
imap
(func, iterable[, chunksize])¶ map()
的延迟执行版本。chunksize 参数的作用和
map()
方法的一样。对于很长的迭代器,给 chunksize 设置一个很大的值会比默认值1
极大 地加快执行速度。同样,如果 chunksize 是
1
, 那么imap()
方法所返回的迭代器的next()
方法拥有一个可选的 timeout 参数: 如果无法在 timeout 秒内执行得到结果,则``next(timeout)`` 会抛出multiprocessing.TimeoutError
异常。
-
imap_unordered
(func, iterable[, chunksize])¶ 和
imap()
相同,只不过通过迭代器返回的结果是任意的。(当进程池中只有一个工作进程的时候,返回结果的顺序才能认为是"有序"的)
-
starmap
(func, iterable[, chunksize])¶ 和
map()
类似,不过 iterable 中的每一项会被解包再作为函数参数。比如可迭代对象
[(1,2), (3, 4)]
会转化为等价于[func(1,2), func(3,4)]
的调用。3.3 新版功能.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ 相当于
starmap()
与map_async()
的结合,迭代 iterable 的每一项,解包作为 func 的参数并执行,返回用于获取结果的对象。3.3 新版功能.
-
close
()¶ 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
-
terminate
()¶ 不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用
terminate()
。
-
join
()¶ 等待工作进程结束。调用
join()
前必须先调用close()
或者terminate()
。
3.3 新版功能: 进程池对象现在支持上下文管理器协议 - 参见 上下文管理器类型 。
__enter__()
返回进程池对象,__exit__()
会调用terminate()
。-
-
class
multiprocessing.pool.
AsyncResult
¶ Pool.apply_async()
和Pool.map_async()
返回对象所属的类。-
get
([timeout])¶ 用于获取执行结果。如果 timeout 不是
None
并且在 timeout 秒内仍然没有执行完得到结果,则抛出multiprocessing.TimeoutError
异常。如果远程调用发生异常,这个异常会通过get()
重新抛出。
-
wait
([timeout])¶ 阻塞,直到返回结果,或者 timeout 秒后超时。
-
ready
()¶ 返回执行状态,是否已经完成。
-
successful
()¶ 判断调用是否已经完成并且未引发异常。 如果还未获得结果则将引发
ValueError
。在 3.7 版更改: 如果没有执行完,会抛出
ValueError
异常而不是AssertionError
。
-
下面的例子演示了进程池的用法:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
监听器及客户端¶
通常情况下,进程间通过队列或者 Pipe()
返回的 Connection
传递消息。
不过,multiprocessing.connection
模块其实提供了一些更灵活的特性。最基础的用法是通过它抽象出来的高级API来操作socket或者Windows命名管道。也提供一些高级用法,如通过 hmac
模块来支持 摘要认证,以及同时监听多个管道连接。
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ 发送一个随机生成的消息到另一端,并等待回复。
如果收到的回复与使用 authkey 作为键生成的信息摘要匹配成功,就会发送一个欢迎信息给管道另一端。否则抛出
AuthenticationError
异常。
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ 接收一条信息,使用 authkey 作为键计算信息摘要,然后将摘要发送回去。
如果没有收到欢迎消息,就抛出
AuthenticationError
异常。
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ 尝试使用 address 地址上的监听器建立一个连接,返回
Connection
。连接的类型取决于 family 参数,但是通常可以省略,因为可以通过 address 的格式推导出来。(查看 地址格式 )
如果提供了 authkey 参数并且不是 None,那它必须是一个字符串并且会被当做基于 HMAC 认证的密钥。如果 authkey 是None 则不会有认证行为。认证失败抛出
AuthenticationError
异常,请查看 See 认证密码 。
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ 可以监听连接请求,是对于绑定套接字或者 Windows 命名管道的封装。
address 是监听器对象中的绑定套接字或命名管道使用的地址。
注解
如果使用 '0.0.0.0' 作为监听地址,那么在Windows上这个地址无法建立连接。想要建立一个可连接的端点,应该使用 '127.0.0.1' 。
family 是套接字(或者命名管道)使用的类型。它可以是以下一种:
'AF_INET'
( TCP 套接字类型),'AF_UNIX'
( Unix 域套接字) 或者'AF_PIPE'
( Windows 命名管道)。其中只有第一个保证各平台可用。如果 family 是None
,那么 family 会根据 address 的格式自动推导出来。如果 address 也是None
, 则取默认值。默认值为可用类型中速度最快的。见 地址格式 。注意,如果 family 是'AF_UNIX'
而address是``None`` ,套接字会在一个tempfile.mkstemp()
创建的私有临时目录中创建。如果监听器对象使用了套接字,backlog (默认值为1) 会在套接字绑定后传递给它的
listen()
方法。如果提供了 authkey 参数并且不是 None,那它必须是一个字符串并且会被当做基于 HMAC 认证的密钥。如果 authkey 是None 则不会有认证行为。认证失败抛出
AuthenticationError
异常,请查看 See 认证密码 。-
accept
()¶ 接受一个连接并返回一个
Connection
对象,其连接到的监听器对象已绑定套接字或者命名管道。如果已经尝试过认证并且失败了,则会抛出AuthenticationError
异常。
-
close
()¶ 关闭监听器对象上的绑定套接字或者命名管道。此函数会在监听器被垃圾回收后自动调用。不过仍然建议显式调用函数关闭。
监听器对象拥有下列只读属性:
-
address
¶ 监听器对象使用的地址。
-
last_accepted
¶ 最后一个连接所使用的地址。如果没有的话就是
None
。
3.3 新版功能: 监听器对象现在支持了上下文管理协议 - 见 上下文管理器类型 。
__enter__()
返回一个监听器对象,__exit__()
会调用close()
。-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ 一直等待直到 object_list 中某个对象处于就绪状态。返回 object_list 中处于就绪状态的对象。如果 timeout 是一个浮点型,该方法会最多阻塞这么多秒。如果 timeout 是
None
,则会允许阻塞的事件没有限制。timeout为负数的情况下和为0的情况相同。对于 Unix 和 Windows ,满足下列条件的对象可以出现在 object_list 中
可读的
Connection
对象;一个已连接并且可读的
socket.socket
对象;或者
当一个连接或者套接字对象拥有有效的数据可被读取的时候,或者另一端关闭后,这个对象就处于就绪状态。
Unix:
wait(object_list, timeout)
和select.select(object_list, [], [], timeout)
几乎相同。差别在于,如果select.select()
被信号中断,它会抛出一个附带错误号为EINTR
的OSError
异常,而wait()
不会。Windows: object_list 中的元素必须是一个表示为整数的可等待的句柄(按照 Win32 函数
WaitForMultipleObjects()
的文档中所定义) 或者一个拥有fileno()
方法的对象,这个对象返回一个套接字句柄或者管道句柄。(注意管道和套接字两种句柄 不是 可等待的句柄)3.3 新版功能.
示例
下面的服务代码创建了一个使用 'secret password'
作为认证密码的监听器。它会等待连接然后发送一些数据给客户端:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
下面的代码连接到服务然后从服务器上j接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
下面的代码使用了 wait()
,以便在同时等待多个进程发来消息。
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
地址格式¶
'AF_INET'
地址是(hostname, port)
形式的元组类型,其中 hostname 是一个字符串,port 是整数。'AF_UNIX'
地址是文件系统上文件名的字符串。'AF_PIPE'
地址是一个r'\.\pipe{PipeName}'
形式的字符串。 要使用Client()
来连接到远程计算机上一个名为 ServerName 的命名管道,则应当改用r'\ServerName\pipe{PipeName}'
形式的地址。
注意,使用两个反斜线开头的字符串默认被当做 'AF_PIPE'
地址而不是 'AF_UNIX'
。
认证密码¶
当使用 Connection.recv
接收数据时,数据会自动被反序列化。不幸的是,对于一个不可信的数据源发来的数据,反序列化是存在安全风险的。所以 Listener
和 Client()
之间使用 hmac
模块进行摘要认证。
认证密钥是一个 byte 类型的字符串,可以认为是和密码一样的东西,连接建立好后,双方都会要求另一方证明知道认证密钥。(这个证明过程不会通过连接发送密钥)
如果要求认证但是没有指定认证密钥,则会使用 current_process().authkey
的返回值 (参见 Process
)。 这个值将被当前进程所创建的任何 Process
对象自动继承。 这意味着 (在默认情况下) 一个包含多进程的程序中的所有进程会在相互间建立连接的时候共享单个认证密钥。
os.urandom()
也可以用来生成合适的认证密钥。
日志记录¶
当前模块也提供了一些对 logging 的支持。注意, logging
模块本身并没有使用进程间共享的锁,所以来自于多个进程的日志可能(具体取决于使用的日志 handler 类型)相互覆盖或者混杂。
-
multiprocessing.
get_logger
()¶ 返回
multiprocessing
使用的 logger,必要的话会创建一个新的。如果创建的首个 logger 日志级别为
logging.NOTSET
并且没有默认 handler。通过这个 logger 打印的消息不会传递到根 logger。注意在 Windows 上,子进程只会继承父进程 logger 的日志级别 - 对于logger的其他自定义项不会继承。
-
multiprocessing.
log_to_stderr
()¶ 此函数会调用
get_logger()
但是会在返回的 logger 上增加一个 handler,将所有输出都使用'[%(levelname)s/%(processName)s] %(message)s'
的格式发送到sys.stderr
。
下面是一个在交互式解释器中打开日志功能的例子:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
要查看日志等级的完整列表,见 logging
模块。
multiprocessing.dummy
模块¶
multiprocessing.dummy
复制了 multiprocessing
的 API,不过是在 threading
模块之上包装了一层。
特别地,multiprocessing.dummy
所提供的 Pool
函数会返回一个 ThreadPool
的实例,该类是 Pool
的子类,它支持所有相同的方法调用但会使用一个工作线程池而非工作进程池。
-
class
multiprocessing.pool.
ThreadPool
([processes[, initializer[, initargs]]])¶ 一个线程池对象,用来控制可向其提交任务的工作线程池。
ThreadPool
实例与Pool
实例是完全接口兼容的,并且它们的资源也必须被正确地管理,或者是将线程池作为上下文管理器来使用,或者是通过手动调用close()
和terminate()
。processes 是要使用的工作线程数目。 如果 processes 为
None
,则使用os.cpu_count()
返回的值。如果 initializer 不为
None
,则每个工作进程将会在启动时调用initializer(*initargs)
。不同于
Pool
,maxtasksperchild 和 context 不可被提供。注解
ThreadPool
具有与Pool
相同的接口,它围绕一个进程池进行设计并且先于concurrent.futures
模块的引入。 因此,它继承了一些对于基于线程的池来说没有意义的操作,并且它具有自己的用于表示异步任务状态的类型AsyncResult
,该类型不为任何其他库所知。用户通常应该倾向于使用
concurrent.futures.ThreadPoolExecutor
,它拥有从一开始就围绕线程进行设计的更简单接口,并且返回与许多其他库相兼容的concurrent.futures.Future
实例,包括asyncio
库。
编程指导¶
使用 multiprocessing
时,应遵循一些指导原则和习惯用法。
所有start方法¶
下面这些适用于所有start方法。
避免共享状态
应该尽可能避免在进程间传递大量数据,越少越好。
最好坚持使用队列或者管道进行进程间通信,而不是底层的同步原语。
可序列化
保证所代理的方法的参数是可以序列化的。
代理的线程安全性
不要在多线程中同时使用一个代理对象,除非你用锁保护它。
(而在不同进程中使用 相同 的代理对象却没有问题。)
使用 Join 避免僵尸进程
在 Unix 上,如果一个进程执行完成但是没有被 join,就会变成僵尸进程。一般来说,僵尸进程不会很多,因为每次新启动进程(或者
active_children()
被调用)时,所有已执行完成且没有被 join 的进程都会自动被 join,而且对一个执行完的进程调用Process.is_alive
也会 join 这个进程。尽管如此,对自己启动的进程显式调用 join 依然是最佳实践。
继承优于序列化、反序列化
当使用 spawn 或者 forkserver 的启动方式时,
multiprocessing
中的许多类型都必须是可序列化的,这样子进程才能使用它们。但是通常我们都应该避免使用管道和队列发送共享对象到另外一个进程,而是重新组织代码,对于其他进程创建出来的共享对象,让那些需要访问这些对象的子进程可以直接将这些对象从父进程继承过来。
避免杀死进程
通过
Process.terminate
停止一个进程很容易导致这个进程正在使用的共享资源(如锁、信号量、管道和队列)损坏或者变得不可用,无法在其他进程中继续使用。所以,最好只对那些从来不使用共享资源的进程调用
Process.terminate
。
Join 使用队列的进程
记住,往队列放入数据的进程会一直等待直到队列中所有项被"feeder" 线程传给底层管道。(子进程可以调用队列的
Queue.cancel_join_thread
方法禁止这种行为)这意味着,任何使用队列的时候,你都要确保在进程join之前,所有存放到队列中的项将会被其他进程、线程完全消费。否则不能保证这个写过队列的进程可以正常终止。记住非精灵进程会自动 join 。
下面是一个会导致死锁的例子:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()交换最后两行可以修复这个问题(或者直接删掉
p.join()
)。
显式传递资源给子进程
在Unix上,使用 fork 方式启动的子进程可以使用父进程中全局创建的共享资源。不过,最好是显式将资源对象通过参数的形式传递给子进程。
除了(部分原因)让代码兼容 Windows 以及其他的进程启动方式外,这种形式还保证了在子进程生命期这个对象是不会被父进程垃圾回收的。如果父进程中的某些对象被垃圾回收会导致资源释放,这就变得很重要。
所以对于实例:
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()应当重写成这样:
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
谨防将 sys.stdin
数据替换为 “类似文件的对象”
multiprocessing
原本会无条件地这样调用:os.close(sys.stdin.fileno())在
multiprocessing.Process._bootstrap()
方法中 —— 这会导致与"进程中的进程"相关的一些问题。这已经被修改成了:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)它解决了进程相互冲突导致文件描述符错误的根本问题,但是对使用带缓冲的“文件类对象”替换
sys.stdin()
作为输出的应用程序造成了潜在的危险。如果多个进程调用了此文件类对象的close()
方法,会导致相同的数据多次刷写到此对象,损坏数据。如果你写入文件类对象并实现了自己的缓存,可以在每次追加缓存数据时记录当前进程id,从而将其变成 fork 安全的,当发现进程id变化后舍弃之前的缓存,例如:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
spawn 和 forkserver 启动方式¶
相对于 fork 启动方式,有一些额外的限制。
更依赖序列化
Process.__init__()
的所有参数都必须可序列化。同样的,当你继承Process
时,需要保证当调用Process.start
方法时,实例可以被序列化。
全局变量
记住,如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行
Process.start
那一刻的值不一样。当全局变量知识模块级别的常量时,是不会有问题的。
安全导入主模块
确保主模块可以被新启动的Python解释器安全导入而不会引发什么副作用(比如又启动了一个子进程)
例如,使用 spawn 或 forkserver 启动方式执行下面的模块,会引发
RuntimeError
异常而失败。from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()应该通过下面的方法使用
if __name__ == '__main__':
,从而保护程序"入口点":from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(如果程序将正常运行而不是冻结,则可以省略
freeze_support()
行)这允许新启动的 Python 解释器安全导入模块然后运行模块中的
foo()
函数。如果主模块中创建了进程池或者管理器,这个规则也适用。
例子¶
创建和使用自定义管理器、代理的示例:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用 Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
一个演示如何使用队列来向一组工作进程提供任务并收集结果的例子:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()