python 如果值多处理池中止
python multiprocessing pool abort if value
我正在开发一个脚本,我在其中随机创建 objects 但我不想重复。它们被存储起来,每次我创建一个新的时,我都会将它与现有的进行核对。因为我想对大量 objects 执行此操作,所以我现在正在尝试将其并行化,但到目前为止没有成功。我尝试了一些在网上找到的解决方案(实际上主要是这里),但仍然没有用。
我的想法是启动一个池并将我的函数映射到它。当一个进程找到一个匹配项时,它将一个值设置为 1。这个值对所有进程都是可读的,他们可以使用锁写入它,我需要它在最后到 return。因此,我创建了一个 Lock
和一个 Value
,以便所有进程都可以读取该值(因此是 lock=False
)并检查是否在另一个进程中找到了匹配项。然后我尝试了一些不同的 Event
并检查它是否已设置,但这仍然不起作用......然后我尝试提出一个特殊的 Exception
但仍然没有成功使代码成功.
拜托,我更喜欢 OOP 编程,所以在我最后一次资源之前我会避免定义一个 global
变量,因为我认为它们不确定(个人意见)。
这是一个 MWE,我用 int
替换了复杂的 objects,用 range(10000)
替换了存储的 objects,以帮助您理解。
#!/usr/bin/env python3
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
#manager = muproc.Manager()
#lock = manager.Lock()
lock = muproc.Lock()
back = muproc.Value("i", 0, lock=False)
ParChild = ParallelChild(me, lock, back)
with muproc.Pool() as pool:
try:
pool.map(ParChild.run, range(10000))
except AbortPool:
pool.terminate()
print("pool")
return back.value
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me, lock, back):
self.abort = muproc.Event()
self.lock = lock
self.me = me
self.back = back
def run(self, neighbour):
print("run")
if self.abort.is_set():
print("Aborting")
pass
else:
if Computation(self.me, neighbour):
self.lock.acquire()
self.abort.set()
self.back.value = 1
print("GOTCHA")
self.lock.release()
raise AbortPool
else:
print("...")
class AbortPool(Exception):
#pass
def __init__(self):
## Just to check
print("AbortPool raised!")
if __name__ == "__main__":
values = [12000, 13, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))
现在它产生 RunTimeError
:
me@stigepc4$ python3 mwe.py
Testing 12000...
Traceback (most recent call last):
File "mwe.py", line 63, in <module>
print("value={} match={}".format(v, ParallelCheck(v)))
File "mwe.py", line 16, in ParallelCheck
pool.map(ParChild.run, range(10000))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
File "/usr/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
put(task)
File "/usr/lib/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.4/multiprocessing/sharedctypes.py", line 128, in reduce_ctype
assert_spawning(obj)
File "/usr/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: c_int objects should only be shared between processes through inheritance
我猜它与 Lock
(虽然评论 Manager
但这并没有更好地工作)或与 Value
有关,但现在知道如何获得摆脱它...
编辑
当我继续尝试更改我的代码以按我希望的方式工作时,我意识到我没有提到我的主要问题是什么。我真正的困难是如果找到匹配项,则让池中的所有进程停止。这就是我所需要的,因此 运行ning 并行优于串行。现在我可以有一个事件来告诉 child 运行 是否已经找到匹配项,但它会不断循环遍历数据,即使我引发异常...
编辑 2
简单地说,我有以下...
for o in objects:
if too_close(o, existing_objects):
return 1
return 0
...我想在 CPU 之间分配...
for o in objects:
if too_close(o, some_existing_objects):
return 1 and abort other processes
return 0
通过寻找答案,我的脚本有点太复杂了。
我试图从接近原始文档的内容开始
多处理模块。
然后没有成功,我寻找一种方法来修复它并添加了一些东西。
我不是 python 多处理方面的专家,但是在尝试了一段时间之后,
我发现在第一场比赛中中止 pool.map
的唯一方法是使用事件
让所有进程都知道它发生了,然后他们都抛出一个特殊的异常
中止自己。
我可以去掉 Value 和 Lock,它们对我来说没用。
但我做事的方式可能不是很有效。
产生这些进程将花费大量的计算时间,
每个进程都会将它需要的数据复制到自己的内存中运行。
我尝试生成更少的进程,但每个进程的数据都更少,而且
他们将自己迭代的数据集(不让池处理这部分)。
所以我可以选择哪些数据进入哪个进程。
在我的示例中,我将 range(10000)
拆分为例如4道工序
每个都有2500的范围。
我只想知道有没有匹配,所以我可以进一步简化。
我可以设置当找到匹配时,设置事件和函数 returns 以便它停止。
另一个进程测试事件的状态,并在设置后立即 return 停止自己。
现在回到主流程,最后只看事件
(当然不要忘记在开始时清除它)。
如果已设置,则找到匹配项,就这么简单。
缺点是我必须声明multiprocessing.Event
全局...
否则当进程产生时,每个子进程都会复制它
他们将无法在它们之间以及与主进程进行通信。
但是正如 bj0 已经提到的,将这个问题并行化可能不会更好...
实现这两种方法后,我将它们与串行问题进行了比较,这是我的结果
对于同一台机器的给定案例:
- 序列号:7s
- 有游泳池:910s
- 有 3 个进程,每个进程都有自己的数据集:97s
所以这里没有什么更好的...我将坚持我的串行实现并寻找其他方法来加速事情,比如除了完全随机的其他方法...
这是我的 MWE 的最后一个工作版本:
#!/usr/bin/env python3
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
global abort
abort.clear()
ParChild = ParallelChild(me)
jobs = []
N = 4
for i in range(N):
jobs.append(muproc.Process(target = ParChild.run, args=(range(i * 2500, (i+1) * 2500),)))
for p in jobs:
p.start()
for p in jobs:
p.join()
if abort.is_set():
print("MATCH FOUND")
return 1
else:
print(" no match...")
return 0
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me):
self.me = me
def run(self, neighbours):
global abort
for neighbour in neighbours:
print("{} vs {} by {}".format(self.me, neighbour, self.CurProc()))
if abort.is_set():
print("Aborting {}".format(self.CurProc()))
return 0
else:
if Computation(self.me, neighbour):
abort.set()
print("GOTCHA {}".format(self.CurProc()))
return 1
def CurProc(self):
return muproc.current_process()._identity[0]
if __name__ == "__main__":
abort = muproc.Event()
values = [12000, 130, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))
我正在开发一个脚本,我在其中随机创建 objects 但我不想重复。它们被存储起来,每次我创建一个新的时,我都会将它与现有的进行核对。因为我想对大量 objects 执行此操作,所以我现在正在尝试将其并行化,但到目前为止没有成功。我尝试了一些在网上找到的解决方案(实际上主要是这里),但仍然没有用。
我的想法是启动一个池并将我的函数映射到它。当一个进程找到一个匹配项时,它将一个值设置为 1。这个值对所有进程都是可读的,他们可以使用锁写入它,我需要它在最后到 return。因此,我创建了一个 Lock
和一个 Value
,以便所有进程都可以读取该值(因此是 lock=False
)并检查是否在另一个进程中找到了匹配项。然后我尝试了一些不同的 Event
并检查它是否已设置,但这仍然不起作用......然后我尝试提出一个特殊的 Exception
但仍然没有成功使代码成功.
拜托,我更喜欢 OOP 编程,所以在我最后一次资源之前我会避免定义一个 global
变量,因为我认为它们不确定(个人意见)。
这是一个 MWE,我用 int
替换了复杂的 objects,用 range(10000)
替换了存储的 objects,以帮助您理解。
#!/usr/bin/env python3
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
#manager = muproc.Manager()
#lock = manager.Lock()
lock = muproc.Lock()
back = muproc.Value("i", 0, lock=False)
ParChild = ParallelChild(me, lock, back)
with muproc.Pool() as pool:
try:
pool.map(ParChild.run, range(10000))
except AbortPool:
pool.terminate()
print("pool")
return back.value
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me, lock, back):
self.abort = muproc.Event()
self.lock = lock
self.me = me
self.back = back
def run(self, neighbour):
print("run")
if self.abort.is_set():
print("Aborting")
pass
else:
if Computation(self.me, neighbour):
self.lock.acquire()
self.abort.set()
self.back.value = 1
print("GOTCHA")
self.lock.release()
raise AbortPool
else:
print("...")
class AbortPool(Exception):
#pass
def __init__(self):
## Just to check
print("AbortPool raised!")
if __name__ == "__main__":
values = [12000, 13, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))
现在它产生 RunTimeError
:
me@stigepc4$ python3 mwe.py
Testing 12000...
Traceback (most recent call last):
File "mwe.py", line 63, in <module>
print("value={} match={}".format(v, ParallelCheck(v)))
File "mwe.py", line 16, in ParallelCheck
pool.map(ParChild.run, range(10000))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
File "/usr/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
put(task)
File "/usr/lib/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.4/multiprocessing/sharedctypes.py", line 128, in reduce_ctype
assert_spawning(obj)
File "/usr/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: c_int objects should only be shared between processes through inheritance
我猜它与 Lock
(虽然评论 Manager
但这并没有更好地工作)或与 Value
有关,但现在知道如何获得摆脱它...
编辑
当我继续尝试更改我的代码以按我希望的方式工作时,我意识到我没有提到我的主要问题是什么。我真正的困难是如果找到匹配项,则让池中的所有进程停止。这就是我所需要的,因此 运行ning 并行优于串行。现在我可以有一个事件来告诉 child 运行 是否已经找到匹配项,但它会不断循环遍历数据,即使我引发异常...
编辑 2
简单地说,我有以下...
for o in objects:
if too_close(o, existing_objects):
return 1
return 0
...我想在 CPU 之间分配...
for o in objects:
if too_close(o, some_existing_objects):
return 1 and abort other processes
return 0
通过寻找答案,我的脚本有点太复杂了。 我试图从接近原始文档的内容开始 多处理模块。 然后没有成功,我寻找一种方法来修复它并添加了一些东西。
我不是 python 多处理方面的专家,但是在尝试了一段时间之后,
我发现在第一场比赛中中止 pool.map
的唯一方法是使用事件
让所有进程都知道它发生了,然后他们都抛出一个特殊的异常
中止自己。
我可以去掉 Value 和 Lock,它们对我来说没用。
但我做事的方式可能不是很有效。 产生这些进程将花费大量的计算时间, 每个进程都会将它需要的数据复制到自己的内存中运行。
我尝试生成更少的进程,但每个进程的数据都更少,而且
他们将自己迭代的数据集(不让池处理这部分)。
所以我可以选择哪些数据进入哪个进程。
在我的示例中,我将 range(10000)
拆分为例如4道工序
每个都有2500的范围。
我只想知道有没有匹配,所以我可以进一步简化。 我可以设置当找到匹配时,设置事件和函数 returns 以便它停止。 另一个进程测试事件的状态,并在设置后立即 return 停止自己。
现在回到主流程,最后只看事件 (当然不要忘记在开始时清除它)。 如果已设置,则找到匹配项,就这么简单。
缺点是我必须声明multiprocessing.Event
全局...
否则当进程产生时,每个子进程都会复制它
他们将无法在它们之间以及与主进程进行通信。
但是正如 bj0 已经提到的,将这个问题并行化可能不会更好...
实现这两种方法后,我将它们与串行问题进行了比较,这是我的结果 对于同一台机器的给定案例:
- 序列号:7s
- 有游泳池:910s
- 有 3 个进程,每个进程都有自己的数据集:97s
所以这里没有什么更好的...我将坚持我的串行实现并寻找其他方法来加速事情,比如除了完全随机的其他方法...
这是我的 MWE 的最后一个工作版本:
#!/usr/bin/env python3
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
global abort
abort.clear()
ParChild = ParallelChild(me)
jobs = []
N = 4
for i in range(N):
jobs.append(muproc.Process(target = ParChild.run, args=(range(i * 2500, (i+1) * 2500),)))
for p in jobs:
p.start()
for p in jobs:
p.join()
if abort.is_set():
print("MATCH FOUND")
return 1
else:
print(" no match...")
return 0
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me):
self.me = me
def run(self, neighbours):
global abort
for neighbour in neighbours:
print("{} vs {} by {}".format(self.me, neighbour, self.CurProc()))
if abort.is_set():
print("Aborting {}".format(self.CurProc()))
return 0
else:
if Computation(self.me, neighbour):
abort.set()
print("GOTCHA {}".format(self.CurProc()))
return 1
def CurProc(self):
return muproc.current_process()._identity[0]
if __name__ == "__main__":
abort = muproc.Event()
values = [12000, 130, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))