Python 多处理 DictProxy 附加到列表的字典不起作用
Python Multiprocessing DictProxy append to dict of list not working
我正在编写一个使用 multiprocessing.managers.DictProxy 的程序。程序遍历一个目录,通过pwd.getpwuid(os.stat(file)[4])[0]
得到的username作为key创建了一个dict,key对应的value是一个列表,其中包含了该用户拥有的文件。例如,对于假定的数据结构:
{'root': ["/boot/vmlinuz", "/boot/grub"], 'testuser': ["/home/testuser", "/home/testuser/.bashrc"]}
我为此编写的代码是
#!/usr/bin/python
import os
import multiprocessing
import sys
import pwd
import grp
manager_attributes = multiprocessing.Manager()
file_stats_user = manager_attributes.dict()
def filestat(file):
try:
stat = os.stat(file)
user = pwd.getpwuid(stat[4])[0]
group = grp.getgrgid(stat[5])[0]
if user not in file_stats_user:
file_stats_user[user] = []
file_stats_user[user].append(file)
except OSError, e:
print e
try:
cores = (multiprocessing.cpu_count()*2)
except:
cores = 8
print "Starting parallel execution with ", cores, "concurrency"
pool_get_attributes = multiprocessing.Pool(cores)
pool_get_attributes.map(filestat, files)
pool_get_attributes.close()
pool_get_attributes.join()
其中 files 是获取的所有文件的列表。
但是,当我打印 file_stats_user 这是一个 multiprocessing.managers.DictProxy 对象时,我得到了键,但是列表值为空,因为
{'root': [], 'testuser': []}
file_stats_user[user].append(file)
没有将文件名附加到相应的键。
我在这里做错了什么?
谢谢。
编辑 如果两个进程同时更新同一个用户的字典,则先前的解决方案存在竞争条件
您的解决方案不起作用,因为默认情况下,列表是当前进程的本地列表,并且不会代理对其进行的更新。
为了使工作正常,您应该使用一个代理列表,以便它可以在所有进程中更新
替换
file_stats_user[user] = []
来自
file_stats_user[user] = manager_attributes.list()
现在,当您附加到列表时,它在所有进程中都是相同的
但是使用共享对象是执行多进程的糟糕方法
更好的方法是收集每次调用的结果
from collections import defaultdict
def filestat(file):
try:
stat = os.stat(file)
user = pwd.getpwuid(stat[4])[0]
group = grp.getgrgid(stat[5])[0]
return user, file
except OSError, e:
print e
return None
try:
cores = (multiprocessing.cpu_count()*2)
except:
cores = 8
print "Starting parallel execution with ", cores, "concurrency"
pool_get_attributes = multiprocessing.Pool(cores)
result_map = pool_get_attributes.map(filestat, files)
result = defaultdict(list)
for user,file in (r for r in result_map if r is not None):
result[user].append(file)
pool_get_attributes.close()
pool_get_attributes.join()
我正在编写一个使用 multiprocessing.managers.DictProxy 的程序。程序遍历一个目录,通过pwd.getpwuid(os.stat(file)[4])[0]
得到的username作为key创建了一个dict,key对应的value是一个列表,其中包含了该用户拥有的文件。例如,对于假定的数据结构:
{'root': ["/boot/vmlinuz", "/boot/grub"], 'testuser': ["/home/testuser", "/home/testuser/.bashrc"]}
我为此编写的代码是
#!/usr/bin/python
import os
import multiprocessing
import sys
import pwd
import grp
manager_attributes = multiprocessing.Manager()
file_stats_user = manager_attributes.dict()
def filestat(file):
try:
stat = os.stat(file)
user = pwd.getpwuid(stat[4])[0]
group = grp.getgrgid(stat[5])[0]
if user not in file_stats_user:
file_stats_user[user] = []
file_stats_user[user].append(file)
except OSError, e:
print e
try:
cores = (multiprocessing.cpu_count()*2)
except:
cores = 8
print "Starting parallel execution with ", cores, "concurrency"
pool_get_attributes = multiprocessing.Pool(cores)
pool_get_attributes.map(filestat, files)
pool_get_attributes.close()
pool_get_attributes.join()
其中 files 是获取的所有文件的列表。
但是,当我打印 file_stats_user 这是一个 multiprocessing.managers.DictProxy 对象时,我得到了键,但是列表值为空,因为
{'root': [], 'testuser': []}
file_stats_user[user].append(file)
没有将文件名附加到相应的键。
我在这里做错了什么?
谢谢。
编辑 如果两个进程同时更新同一个用户的字典,则先前的解决方案存在竞争条件
您的解决方案不起作用,因为默认情况下,列表是当前进程的本地列表,并且不会代理对其进行的更新。
为了使工作正常,您应该使用一个代理列表,以便它可以在所有进程中更新
替换
file_stats_user[user] = []
来自
file_stats_user[user] = manager_attributes.list()
现在,当您附加到列表时,它在所有进程中都是相同的
但是使用共享对象是执行多进程的糟糕方法
更好的方法是收集每次调用的结果
from collections import defaultdict
def filestat(file):
try:
stat = os.stat(file)
user = pwd.getpwuid(stat[4])[0]
group = grp.getgrgid(stat[5])[0]
return user, file
except OSError, e:
print e
return None
try:
cores = (multiprocessing.cpu_count()*2)
except:
cores = 8
print "Starting parallel execution with ", cores, "concurrency"
pool_get_attributes = multiprocessing.Pool(cores)
result_map = pool_get_attributes.map(filestat, files)
result = defaultdict(list)
for user,file in (r for r in result_map if r is not None):
result[user].append(file)
pool_get_attributes.close()
pool_get_attributes.join()