在多个进程中共享 Python dict
Share Python dict across many processes
我正在开发一种启发式算法来为 NP(因此 CPU 密集)问题找到“好的”解决方案。
我正在使用 Python 实施我的解决方案(我同意这不是最佳选择,因为速度是一个问题,但确实如此)并且我将工作量分配给许多子流程,每个子流程负责探索 space 可能解决方案的一个分支。
为了提高性能,我想在所有子流程之间共享每个子流程执行期间收集的一些信息。
收集此类信息的“明显”方法是将它们收集在一个字典中,该字典的键是(冻结的)整数集,值是整数列表(或集)。
因此,共享字典必须是每个子进程的可读和 writable,但我可以放心地预期读取将比写入更频繁,因为子进程只有在发现“有趣”的东西时才会写入共享字典并且会更频繁地阅读字典以了解某个解决方案是否已经被其他进程评估过(以避免探索同一个分支两次或更多次)。
我预计此类词典的大小不会超过 10 MB。
目前,我使用 multiprocessing.Manager()
的一个实例实现了共享字典,它负责处理开箱即用的共享字典的并发访问。
然而(根据我所发现的)这种共享数据的方式是使用进程之间的管道实现的,这比普通和简单的共享内存慢得多(此外,字典必须在通过管道发送之前进行 pickle,并在接收时取消 pickled ).
到目前为止,我的代码如下所示:
# main.py
import multiprocessing as mp
import os
def worker(a, b, c, shared_dict):
while condition:
# do things
# sometimes reads from shared_dict to check if a candidate solution has already been evaluated by other process
# if not, evaluate it and store it inside the shared_dict together with some related info
return worker_result
def main():
with mp.Manager() as manager:
# setup params a, b, c, ...
# ...
shared_dict = manager.dict()
n_processes = os.cpu_count()
with mp.Pool(processes=n_processes) as pool:
async_results = [pool.apply_async(worker, (a, b, c, shared_dict)) for _ in range(n_processes)]
results = [res.get() for res in async_results]
# gather the overall result from 'results' list
if __name__ == '__main__':
main()
为了避免管道的开销,我想使用共享内存,但 Python 标准库似乎没有提供直接的方法来处理共享内存中的字典。
据我所知,Python 标准库提供了帮助程序,仅用于标准 ctypes(使用 multiprocessing.Value
and multiprocessing.Array
) or gives you access to raw areas of shared memory.
将数据存储在共享内存中
我不想在共享内存的原始区域中实现我自己的散列 table,因为我既不是散列 table 的专家,也不是并发编程的专家,相反我想知道如果有其他更快的解决方案可以满足我的需求,而不需要从零开始编写所有内容。
例如,我已经看到 ray library 允许比使用管道更快地读取写在共享内存中的数据,但是一旦字典被序列化并写入共享内存区域,您似乎就无法修改它。
有什么帮助吗?
不幸的是,Ray 中的共享内存必须是不可变的。通常,建议您将参与者用于可变状态。 (see here).
你可以和演员一起玩一些把戏。例如,如果值是不可变的,您可以将对象引用存储在字典中。然后字典本身不会在共享内存中,但它的所有对象都会。
@ray.remote
class DictActor
def __init__(self):
self._dict = {}
def put(self, key, value):
self._dict[key] = ray.put(value)
def get(self, key):
return self._dict[key]
d = DictActor.remote()
ray.get(d.put.remote("a", np.zeros(100)))
ray.get(d.get.remote("a")) # This result is in shared memory.
我正在开发一种启发式算法来为 NP(因此 CPU 密集)问题找到“好的”解决方案。
我正在使用 Python 实施我的解决方案(我同意这不是最佳选择,因为速度是一个问题,但确实如此)并且我将工作量分配给许多子流程,每个子流程负责探索 space 可能解决方案的一个分支。
为了提高性能,我想在所有子流程之间共享每个子流程执行期间收集的一些信息。 收集此类信息的“明显”方法是将它们收集在一个字典中,该字典的键是(冻结的)整数集,值是整数列表(或集)。 因此,共享字典必须是每个子进程的可读和 writable,但我可以放心地预期读取将比写入更频繁,因为子进程只有在发现“有趣”的东西时才会写入共享字典并且会更频繁地阅读字典以了解某个解决方案是否已经被其他进程评估过(以避免探索同一个分支两次或更多次)。 我预计此类词典的大小不会超过 10 MB。
目前,我使用 multiprocessing.Manager()
的一个实例实现了共享字典,它负责处理开箱即用的共享字典的并发访问。
然而(根据我所发现的)这种共享数据的方式是使用进程之间的管道实现的,这比普通和简单的共享内存慢得多(此外,字典必须在通过管道发送之前进行 pickle,并在接收时取消 pickled ).
到目前为止,我的代码如下所示:
# main.py
import multiprocessing as mp
import os
def worker(a, b, c, shared_dict):
while condition:
# do things
# sometimes reads from shared_dict to check if a candidate solution has already been evaluated by other process
# if not, evaluate it and store it inside the shared_dict together with some related info
return worker_result
def main():
with mp.Manager() as manager:
# setup params a, b, c, ...
# ...
shared_dict = manager.dict()
n_processes = os.cpu_count()
with mp.Pool(processes=n_processes) as pool:
async_results = [pool.apply_async(worker, (a, b, c, shared_dict)) for _ in range(n_processes)]
results = [res.get() for res in async_results]
# gather the overall result from 'results' list
if __name__ == '__main__':
main()
为了避免管道的开销,我想使用共享内存,但 Python 标准库似乎没有提供直接的方法来处理共享内存中的字典。
据我所知,Python 标准库提供了帮助程序,仅用于标准 ctypes(使用 multiprocessing.Value
and multiprocessing.Array
) or gives you access to raw areas of shared memory.
我不想在共享内存的原始区域中实现我自己的散列 table,因为我既不是散列 table 的专家,也不是并发编程的专家,相反我想知道如果有其他更快的解决方案可以满足我的需求,而不需要从零开始编写所有内容。 例如,我已经看到 ray library 允许比使用管道更快地读取写在共享内存中的数据,但是一旦字典被序列化并写入共享内存区域,您似乎就无法修改它。
有什么帮助吗?
不幸的是,Ray 中的共享内存必须是不可变的。通常,建议您将参与者用于可变状态。 (see here).
你可以和演员一起玩一些把戏。例如,如果值是不可变的,您可以将对象引用存储在字典中。然后字典本身不会在共享内存中,但它的所有对象都会。
@ray.remote
class DictActor
def __init__(self):
self._dict = {}
def put(self, key, value):
self._dict[key] = ray.put(value)
def get(self, key):
return self._dict[key]
d = DictActor.remote()
ray.get(d.put.remote("a", np.zeros(100)))
ray.get(d.get.remote("a")) # This result is in shared memory.