Python 使用管理器、池和共享列表的多处理并发不起作用
Python Multiprocessing concurrency using Manager, Pool and a shared list not working
我正在学习 python 多处理,我正在尝试使用此功能来填充一个列表,其中包含 os 中存在的所有文件。但是,我编写的代码仅按顺序执行。
#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
for x in range(len(tld))]
for i in mp:
i.start()
i.join()
print len(files)
当我检查进程树时,我看到只生成了一个智利进程。 (man pstree 说 {} 表示父进程生成的子进程。)
---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
`-python(12750)`
我一直在寻找的是,为每个 tld 目录生成一个进程,填充共享列表 files
,这将是大约 10-15 个进程,具体取决于目录的数量。我做错了什么?
编辑::
我使用 multiprocessing.Pool
创建工作线程,这次
进程已生成,但在我尝试使用 multiprocessing.Pool.map()
时出现错误。我指的是 python 文档中显示
的以下代码
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
按照那个例子,我将代码重写为
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
它正在分叉多个进程。
---bash(10949)---python(12890)-+-python(12967)
|-python(12968)
|-python(12970)
|-python(12971)
|-python(12972)
---snip---
但是代码出错说
Process PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
AttributeError: 'module' object has no attribute 'get_files'
self._target(*self._args, **self._kwargs)
self.run()
task = get()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
AttributeError: 'module' object has no attribute 'get_files'
self.run()
我这里哪里做错了,为什么 get_files() 函数会出错?
这只是因为您在定义函数之前实例化了池 get_files
:
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
一个进程的总体思路是,在你启动它的那一刻,你就分叉了主进程的内存。所以在主进程中完成的任何定义在之后fork将不会在子进程中。
如果你想要一个共享内存,你可以使用 threading
库,但是你会遇到一些问题 (cf: The global interpreter lock)
我 运行 穿过这个并在 Python 3.x 上尝试接受的答案,它没有
工作有几个原因。这是一个确实有效的修改版本(截至撰写本文时 Python 3.10.1):
import multiprocessing
import os
def get_files(x, files_):
proc = multiprocessing.Process()
for root, dir, file in os.walk(x):
for name in file:
full_path = os.path.join(root, name)
# print(filename"worker:{proc.name} path:{full_path}")
files_.append(full_path)
if __name__ == '__main__':
# See https://docs.python.org/3/library/multiprocessing.html
with multiprocessing.Manager() as manager:
# The code will count the number of result_files under the specified root:
root = '/'
# Create the top level list of folders which will be walked (and result_files counted)
tld = [os.path.join(os.pathsep, root, filename) for filename in next(os.walk(root))[1]]
# Creates result list object in the manager, which is passed to the workers to collect results into.
result_files = manager.list()
# Create a pool of workers, with the size being equal to the number of top level folders:
pool = multiprocessing.Pool(processes=len(tld))
# Use starmap() instead of map() to allow passing multiple arguments (e.g. the folder and the result_files list).
pool.starmap(get_files, [(folder, result_files) for folder in tld])
pool.close()
pool.join()
# The result, the count of the number of result_files.
print(len(result_files))
我正在学习 python 多处理,我正在尝试使用此功能来填充一个列表,其中包含 os 中存在的所有文件。但是,我编写的代码仅按顺序执行。
#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
for x in range(len(tld))]
for i in mp:
i.start()
i.join()
print len(files)
当我检查进程树时,我看到只生成了一个智利进程。 (man pstree 说 {} 表示父进程生成的子进程。)
---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
`-python(12750)`
我一直在寻找的是,为每个 tld 目录生成一个进程,填充共享列表 files
,这将是大约 10-15 个进程,具体取决于目录的数量。我做错了什么?
编辑::
我使用 multiprocessing.Pool
创建工作线程,这次
进程已生成,但在我尝试使用 multiprocessing.Pool.map()
时出现错误。我指的是 python 文档中显示
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
按照那个例子,我将代码重写为
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
它正在分叉多个进程。
---bash(10949)---python(12890)-+-python(12967)
|-python(12968)
|-python(12970)
|-python(12971)
|-python(12972)
---snip---
但是代码出错说
Process PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
AttributeError: 'module' object has no attribute 'get_files'
self._target(*self._args, **self._kwargs)
self.run()
task = get()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
AttributeError: 'module' object has no attribute 'get_files'
self.run()
我这里哪里做错了,为什么 get_files() 函数会出错?
这只是因为您在定义函数之前实例化了池 get_files
:
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
一个进程的总体思路是,在你启动它的那一刻,你就分叉了主进程的内存。所以在主进程中完成的任何定义在之后fork将不会在子进程中。
如果你想要一个共享内存,你可以使用 threading
库,但是你会遇到一些问题 (cf: The global interpreter lock)
我 运行 穿过这个并在 Python 3.x 上尝试接受的答案,它没有 工作有几个原因。这是一个确实有效的修改版本(截至撰写本文时 Python 3.10.1):
import multiprocessing
import os
def get_files(x, files_):
proc = multiprocessing.Process()
for root, dir, file in os.walk(x):
for name in file:
full_path = os.path.join(root, name)
# print(filename"worker:{proc.name} path:{full_path}")
files_.append(full_path)
if __name__ == '__main__':
# See https://docs.python.org/3/library/multiprocessing.html
with multiprocessing.Manager() as manager:
# The code will count the number of result_files under the specified root:
root = '/'
# Create the top level list of folders which will be walked (and result_files counted)
tld = [os.path.join(os.pathsep, root, filename) for filename in next(os.walk(root))[1]]
# Creates result list object in the manager, which is passed to the workers to collect results into.
result_files = manager.list()
# Create a pool of workers, with the size being equal to the number of top level folders:
pool = multiprocessing.Pool(processes=len(tld))
# Use starmap() instead of map() to allow passing multiple arguments (e.g. the folder and the result_files list).
pool.starmap(get_files, [(folder, result_files) for folder in tld])
pool.close()
pool.join()
# The result, the count of the number of result_files.
print(len(result_files))