Python: multiprocessing - 在一个进程完成后终止其他进程
Python: multiprocessing - terminate other processes after one process finished
我有一些程序,其中多个进程试图完成一些功能。我现在的目标是在一个进程成功完成功能后停止所有其他进程。
不幸的是,下面显示的python程序一直等到所有进程都成功解决了find函数中给出的问题。我该如何解决我的问题?
import multiprocessing
import random
FIND = 50
MAX_COUNT = 100000
INTERVAL = range(10)
def find(process, initial, return_dict):
succ = False
while succ == False:
start=initial
while(start <= MAX_COUNT):
if(FIND == start):
return_dict[process] = f"Found: {process}, start: {initial}"
succ = True
break;
i = random.choice(INTERVAL)
start = start + i
print(start)
processes = []
manager = multiprocessing.Manager()
return_code = manager.dict()
for i in range(5):
process = multiprocessing.Process(target=find, args=(f'computer_{i}', i, return_code))
processes.append(process)
process.start()
for process in processes:
process.join()
print(return_code.values())
输出可以是例如:
['Found: computer_0, start: 0', 'Found: computer_4, start: 4', 'Found: computer_2, start: 2', 'Found: computer_1, start: 1', 'Found: computer_3, start: 3']
但是这个输出显示程序正在等待所有进程完成...
您可以使用 multiprocessing.Queue
和 multiprocessing.Queue.get
执行此操作。这是如何工作的,默认情况下 get
会阻塞,直到队列中有内容。因此它将 return 附加到队列的第一个结果,即完成搜索的进程之一。之后,我们可以遍历进程并终止每个进程(请注意,除非 daemon
设置为 True,否则终止进程不会杀死进程产生的子进程)。
import multiprocessing
import random
import time
FIND = 50
MAX_COUNT = 100000
INTERVAL = range(10)
queue = multiprocessing.Queue(maxsize=1)
def find(process, initial):
succ = False
while succ == False:
start=initial
while(start <= MAX_COUNT):
if(FIND == start):
queue.put(f"Found: {process}, start: {initial}")
break;
i = random.choice(INTERVAL)
start = start + i
print(process, start)
processes = []
manager = multiprocessing.Manager()
for i in range(5):
process = multiprocessing.Process(target=find, args=(f'computer_{i}', i))
processes.append(process)
process.start()
ret = queue.get()
for i in range(5):
process = processes[i]
process.terminate()
print(f'terminated {i}')
print(ret)
您可能还想查看 setting the daemon,它会在主进程退出后终止进程。
使用 Event
来管理进程是否应保持 运行。
基本上,它将 succ
替换为适用于所有进程的东西。
import multiprocessing
import random
FIND = 50
MAX_COUNT = 1000
def find(process, initial, return_dict, run):
while run.is_set():
start = initial
while start <= MAX_COUNT:
if FIND == start:
return_dict[process] = f"Found: {process}, start: {initial}"
run.clear() # Stop running.
break
start += random.randrange(0, 10)
print(start)
if __name__ == "__main__":
processes = []
manager = multiprocessing.Manager()
return_code = manager.dict()
run = manager.Event()
run.set() # We should keep running.
for i in range(5):
process = multiprocessing.Process(
target=find, args=(f"computer_{i}", i, return_code, run)
)
processes.append(process)
process.start()
for process in processes:
process.join()
print(return_code.values())
请注意, 必须使用 __name__
才能使多处理在 ms-windows 和 macOS 上正常工作。
在这些系统上,主模块 导入 到新创建的 Python 进程中。这需要没有副作用,例如启动进程,__name__
机制确保了这一点。
我有一些程序,其中多个进程试图完成一些功能。我现在的目标是在一个进程成功完成功能后停止所有其他进程。
不幸的是,下面显示的python程序一直等到所有进程都成功解决了find函数中给出的问题。我该如何解决我的问题?
import multiprocessing
import random
FIND = 50
MAX_COUNT = 100000
INTERVAL = range(10)
def find(process, initial, return_dict):
succ = False
while succ == False:
start=initial
while(start <= MAX_COUNT):
if(FIND == start):
return_dict[process] = f"Found: {process}, start: {initial}"
succ = True
break;
i = random.choice(INTERVAL)
start = start + i
print(start)
processes = []
manager = multiprocessing.Manager()
return_code = manager.dict()
for i in range(5):
process = multiprocessing.Process(target=find, args=(f'computer_{i}', i, return_code))
processes.append(process)
process.start()
for process in processes:
process.join()
print(return_code.values())
输出可以是例如:
['Found: computer_0, start: 0', 'Found: computer_4, start: 4', 'Found: computer_2, start: 2', 'Found: computer_1, start: 1', 'Found: computer_3, start: 3']
但是这个输出显示程序正在等待所有进程完成...
您可以使用 multiprocessing.Queue
和 multiprocessing.Queue.get
执行此操作。这是如何工作的,默认情况下 get
会阻塞,直到队列中有内容。因此它将 return 附加到队列的第一个结果,即完成搜索的进程之一。之后,我们可以遍历进程并终止每个进程(请注意,除非 daemon
设置为 True,否则终止进程不会杀死进程产生的子进程)。
import multiprocessing
import random
import time
FIND = 50
MAX_COUNT = 100000
INTERVAL = range(10)
queue = multiprocessing.Queue(maxsize=1)
def find(process, initial):
succ = False
while succ == False:
start=initial
while(start <= MAX_COUNT):
if(FIND == start):
queue.put(f"Found: {process}, start: {initial}")
break;
i = random.choice(INTERVAL)
start = start + i
print(process, start)
processes = []
manager = multiprocessing.Manager()
for i in range(5):
process = multiprocessing.Process(target=find, args=(f'computer_{i}', i))
processes.append(process)
process.start()
ret = queue.get()
for i in range(5):
process = processes[i]
process.terminate()
print(f'terminated {i}')
print(ret)
您可能还想查看 setting the daemon,它会在主进程退出后终止进程。
使用 Event
来管理进程是否应保持 运行。
基本上,它将 succ
替换为适用于所有进程的东西。
import multiprocessing
import random
FIND = 50
MAX_COUNT = 1000
def find(process, initial, return_dict, run):
while run.is_set():
start = initial
while start <= MAX_COUNT:
if FIND == start:
return_dict[process] = f"Found: {process}, start: {initial}"
run.clear() # Stop running.
break
start += random.randrange(0, 10)
print(start)
if __name__ == "__main__":
processes = []
manager = multiprocessing.Manager()
return_code = manager.dict()
run = manager.Event()
run.set() # We should keep running.
for i in range(5):
process = multiprocessing.Process(
target=find, args=(f"computer_{i}", i, return_code, run)
)
processes.append(process)
process.start()
for process in processes:
process.join()
print(return_code.values())
请注意, 必须使用 __name__
才能使多处理在 ms-windows 和 macOS 上正常工作。
在这些系统上,主模块 导入 到新创建的 Python 进程中。这需要没有副作用,例如启动进程,__name__
机制确保了这一点。