Python 多处理,将文件作为深度拷贝传递?
Python Multiprocessing, passing files as deepcopies?
我有一个非常可并行化的任务:给定一个 ID,在图形结构中查找它,查看它指向的 ID 并计算它们值的平均值。
我正在为此任务使用 python 多处理,我有一个名为 graph
的对象,它总结了我感兴趣的属性和层次结构。
以一种非常天真的方式使用python多处理:
processes = []
for i in range(10):
currDataset = getOneTenthofDataset(fullDataset)
process = multiprocessing.process(name='name', target=func1, args=(currDataset, graph))
processes.append(process)
process.start()
for p in processes:
p.join()
def func1(dataset, graph):
...
for each row of dataset query on graph, update current row accordingly
save dataset to file (I don't need to return the splitted datasets!)
我发现计算时间几乎没有加速(我认为这可能是因为图上的查询正在排队而不是并行完成?)
所以我所做的是
def func2(dataset, graph):
localGraph = copy.deepcopy(graph)
.. same code as func1 ..
这导致了一个神秘而奇怪的错误!事实上,并非所有进程都正确结束,但只有 1/3 到 1/2,并且每次都有不同的进程正确结束。避免 deepcopy 而不是一切正常。
另一个奇怪的行为是,没有走到尽头的进程不会抛出任何错误,它们只是停止执行,不会转到 deepcopy 后面的指令!
最后,程序终止,没有任何错误
multiprocessing
需要 serialize/deserialize 您传递给函数的任何参数。如果您的数据集很大,那将是昂贵的。在分叉系统上(不是 Windows!),您可以利用子进程获取父进程内存的写时复制视图并跳过复制这一事实。将 fullDataset
和 graph
放在 func1
可以找到它的地方,而无需将其作为参数传递,例如模块的 global
命名空间。由于您的示例代码已经在模块级别,我假设您已经完成了。将 getOneTenthOfDataset
替换为 getDatasetSlice
(当然是我编的),你可以开始了。
processes = []
for i in range(10):
process = multiprocessing.process(name='name', target=func1, args=(i,))
processes.append(process)
process.start()
for p in processes:
p.join()
def func1(i):
dataset = getDatasetSlice(i)
graph # Its already in module namespace
....
for each row of dataset query on graph, update current row accordingly
save dataset to file (I don't need to return the splitted datasets!)
我有一个非常可并行化的任务:给定一个 ID,在图形结构中查找它,查看它指向的 ID 并计算它们值的平均值。
我正在为此任务使用 python 多处理,我有一个名为 graph
的对象,它总结了我感兴趣的属性和层次结构。
以一种非常天真的方式使用python多处理:
processes = []
for i in range(10):
currDataset = getOneTenthofDataset(fullDataset)
process = multiprocessing.process(name='name', target=func1, args=(currDataset, graph))
processes.append(process)
process.start()
for p in processes:
p.join()
def func1(dataset, graph):
...
for each row of dataset query on graph, update current row accordingly
save dataset to file (I don't need to return the splitted datasets!)
我发现计算时间几乎没有加速(我认为这可能是因为图上的查询正在排队而不是并行完成?) 所以我所做的是
def func2(dataset, graph):
localGraph = copy.deepcopy(graph)
.. same code as func1 ..
这导致了一个神秘而奇怪的错误!事实上,并非所有进程都正确结束,但只有 1/3 到 1/2,并且每次都有不同的进程正确结束。避免 deepcopy 而不是一切正常。 另一个奇怪的行为是,没有走到尽头的进程不会抛出任何错误,它们只是停止执行,不会转到 deepcopy 后面的指令! 最后,程序终止,没有任何错误
multiprocessing
需要 serialize/deserialize 您传递给函数的任何参数。如果您的数据集很大,那将是昂贵的。在分叉系统上(不是 Windows!),您可以利用子进程获取父进程内存的写时复制视图并跳过复制这一事实。将 fullDataset
和 graph
放在 func1
可以找到它的地方,而无需将其作为参数传递,例如模块的 global
命名空间。由于您的示例代码已经在模块级别,我假设您已经完成了。将 getOneTenthOfDataset
替换为 getDatasetSlice
(当然是我编的),你可以开始了。
processes = []
for i in range(10):
process = multiprocessing.process(name='name', target=func1, args=(i,))
processes.append(process)
process.start()
for p in processes:
p.join()
def func1(i):
dataset = getDatasetSlice(i)
graph # Its already in module namespace
....
for each row of dataset query on graph, update current row accordingly
save dataset to file (I don't need to return the splitted datasets!)