为什么 multiprocessing 的 apply_async 如此挑剔?
Why is multiprocessing's apply_async so picky?
可以正常工作的示例代码:
from multiprocessing import *
import time
import random
def myfunc(d):
a = random.randint(0,1000)
d[a] = a
print("Process; %s" % a)
print("Starting mass threads")
man = Manager()
d = man.dict()
p = Pool(processes=8)
for i in range(0,100):
p.apply_async(myfunc, [d])
p.close()
p.join()
print(d)
print("Ending multiprocessing")
如果您将 p.apply_async(myfunc, [d])
更改为 p.apply_async(myfunc, (d))
或 p.apply_async(myfunc, d)
,则该池将根本无法工作。如果您将另一个参数添加到 myfunc
,然后只传入一个 None
,它将像这样工作 p.apply_async(myfunc, (None, d))
— 但为什么呢?
apply_async
的文档说明如下:
apply(func[, args[, kwds]])
Call func
with arguments args
and keyword arguments kwds
. It blocks until the result is ready. Given this blocks, apply_async()
is better suited for performing work in parallel. Additionally, func
is only executed in one of the workers of the pool.
因此,它不是采用星形和双星形参数,而是采用位置参数和关键字参数作为函数的第二个和第三个参数传递给目标函数;第二个必须是 iterable,第三个必须是 mapping。
请注意,由于应用是异步工作的,因此您不会看到任何异常,除非您从结果中 .wait
和 .get
它们;
您可以简单地尝试一下:
for i in range(0,100):
result = p.apply_async(myfunc, d)
print(result.get())
在上面的代码中,result.get()
等待第 100 个线程的完成和 returns 它的返回值 - 或者尝试,因为它会失败,因为托管字典不能用作位置参数:
Traceback (most recent call last):
File "test.py", line 21, in <module>
print(result.get())
File "/usr/lib/pythonN.N/multiprocessing/pool.py", line 558, in get
raise self._value
KeyError: 0
因此,看看你原来的问题:请注意 [d]
是一个长度为 1 的列表; (d)
等同于 d
;要获得长度为 1 的 元组 ,您需要键入 (d,)
。来自 Python 3 tutorial section 5.3:
A special problem is the construction of tuples containing 0 or 1
items: the syntax has some extra quirks to accommodate these. Empty
tuples are constructed by an empty pair of parentheses; a tuple with
one item is constructed by following a value with a comma (it is not
sufficient to enclose a single value in parentheses). Ugly, but
effective. For example:
>>> empty = ()
>>> singleton = 'hello', # <-- note trailing comma
>>> len(empty)
0
>>> len(singleton)
1
>>> singleton
('hello',)
(d,)
、[d]
、{d}
,甚至 iter(frozenset(d))
或 {d: True}
都可以很好地用作位置参数;所有这些作为 args
将产生一个 Iterable,其迭代器恰好产生 1 个值 - d
的值。另一方面,如果除了不幸的 managed dictionary 之外,你几乎传递了任何其他类型的值,你会得到一个更有用的错误;假设该值为 42
,您将得到:
TypeError: myfunc() argument after * must be a sequence, not int
可以正常工作的示例代码:
from multiprocessing import *
import time
import random
def myfunc(d):
a = random.randint(0,1000)
d[a] = a
print("Process; %s" % a)
print("Starting mass threads")
man = Manager()
d = man.dict()
p = Pool(processes=8)
for i in range(0,100):
p.apply_async(myfunc, [d])
p.close()
p.join()
print(d)
print("Ending multiprocessing")
如果您将 p.apply_async(myfunc, [d])
更改为 p.apply_async(myfunc, (d))
或 p.apply_async(myfunc, d)
,则该池将根本无法工作。如果您将另一个参数添加到 myfunc
,然后只传入一个 None
,它将像这样工作 p.apply_async(myfunc, (None, d))
— 但为什么呢?
apply_async
的文档说明如下:
apply(func[, args[, kwds]])
Call
func
with argumentsargs
and keyword argumentskwds
. It blocks until the result is ready. Given this blocks,apply_async()
is better suited for performing work in parallel. Additionally,func
is only executed in one of the workers of the pool.
因此,它不是采用星形和双星形参数,而是采用位置参数和关键字参数作为函数的第二个和第三个参数传递给目标函数;第二个必须是 iterable,第三个必须是 mapping。
请注意,由于应用是异步工作的,因此您不会看到任何异常,除非您从结果中 .wait
和 .get
它们;
您可以简单地尝试一下:
for i in range(0,100):
result = p.apply_async(myfunc, d)
print(result.get())
在上面的代码中,result.get()
等待第 100 个线程的完成和 returns 它的返回值 - 或者尝试,因为它会失败,因为托管字典不能用作位置参数:
Traceback (most recent call last):
File "test.py", line 21, in <module>
print(result.get())
File "/usr/lib/pythonN.N/multiprocessing/pool.py", line 558, in get
raise self._value
KeyError: 0
因此,看看你原来的问题:请注意 [d]
是一个长度为 1 的列表; (d)
等同于 d
;要获得长度为 1 的 元组 ,您需要键入 (d,)
。来自 Python 3 tutorial section 5.3:
A special problem is the construction of tuples containing 0 or 1 items: the syntax has some extra quirks to accommodate these. Empty tuples are constructed by an empty pair of parentheses; a tuple with one item is constructed by following a value with a comma (it is not sufficient to enclose a single value in parentheses). Ugly, but effective. For example:
>>> empty = () >>> singleton = 'hello', # <-- note trailing comma >>> len(empty) 0 >>> len(singleton) 1 >>> singleton ('hello',)
(d,)
、[d]
、{d}
,甚至 iter(frozenset(d))
或 {d: True}
都可以很好地用作位置参数;所有这些作为 args
将产生一个 Iterable,其迭代器恰好产生 1 个值 - d
的值。另一方面,如果除了不幸的 managed dictionary 之外,你几乎传递了任何其他类型的值,你会得到一个更有用的错误;假设该值为 42
,您将得到:
TypeError: myfunc() argument after * must be a sequence, not int