在池进程之间共享一个队列对象
Share a queue object between pool processes
我使用 pathos
ProcessingPool
class 来安排跨多个内核并发执行 run_regex()
函数。该函数将正则表达式作为参数并评估列表条目以进行匹配。如果找到匹配项,它将匹配值放入 result_queue
.
据我了解,目前每个工作进程都会在其虚拟地址 space 中创建 result_queue
的本地副本。但是,我想将此 Queue 对象用作共享内存机制,以便从主进程访问所有匹配项。
问题:
- 有没有办法将 Queue 对象传递到 Pool 初始化程序中,以便队列充当共享内存部分?
- 队列对象需要同步吗?
- 有没有更好的方法来解决这个问题?
代码片段
from multiprocessing import Lock, Queue
from pathos.multiprocessing import ProcessingPool
result_queue = Queue()
lock = Lock()
data = {}
def run_regex(self, expr):
for key, value in data.iteritems():
matchStr = re.search(expr, key, re.I)
if matchStr:
lock.acquire()
result_queue.put(key)
lock.release()
break
def check_path(self):
pool = ProcessingPool()
pool.map(run_regex, in_regex)
- 是的,你可以看一下
Pool
对象的initializer参数。
Queue
对象已经是 mp 安全的,所以没有必要保护它们。
您不需要 run_regex
函数中的 Queue
到 return 值。您可以简单地 return 函数中的 key
,它会在 map
结果中可用。
def run_regex(expr):
group = []
for key, value in data.iteritems():
match = re.search(expr, key, re.I)
if match is not None:
group.append(key)
return group
groups = pool.map(run_regex, in_regex)
keys = [key for group in groups for key in group]
或
keys = list(itertools.chain.from_iterable(groups))
map
将 return 按 run_regex
分组的键。之后您可以轻松地展开列表。
我使用 pathos
ProcessingPool
class 来安排跨多个内核并发执行 run_regex()
函数。该函数将正则表达式作为参数并评估列表条目以进行匹配。如果找到匹配项,它将匹配值放入 result_queue
.
据我了解,目前每个工作进程都会在其虚拟地址 space 中创建 result_queue
的本地副本。但是,我想将此 Queue 对象用作共享内存机制,以便从主进程访问所有匹配项。
问题:
- 有没有办法将 Queue 对象传递到 Pool 初始化程序中,以便队列充当共享内存部分?
- 队列对象需要同步吗?
- 有没有更好的方法来解决这个问题?
代码片段
from multiprocessing import Lock, Queue
from pathos.multiprocessing import ProcessingPool
result_queue = Queue()
lock = Lock()
data = {}
def run_regex(self, expr):
for key, value in data.iteritems():
matchStr = re.search(expr, key, re.I)
if matchStr:
lock.acquire()
result_queue.put(key)
lock.release()
break
def check_path(self):
pool = ProcessingPool()
pool.map(run_regex, in_regex)
- 是的,你可以看一下
Pool
对象的initializer参数。 Queue
对象已经是 mp 安全的,所以没有必要保护它们。您不需要
run_regex
函数中的Queue
到 return 值。您可以简单地 return 函数中的key
,它会在map
结果中可用。def run_regex(expr): group = [] for key, value in data.iteritems(): match = re.search(expr, key, re.I) if match is not None: group.append(key) return group groups = pool.map(run_regex, in_regex) keys = [key for group in groups for key in group]
或
keys = list(itertools.chain.from_iterable(groups))
map
将 return 按run_regex
分组的键。之后您可以轻松地展开列表。