Python 多处理 - 几秒钟后开始下一个进程
Python multiprocessing - starting next process after several seconds
我是 Python 的初学者,所以如果您能帮助我提供清晰易懂的解释,我将不胜感激。
在我的 Python 脚本中,我有一个函数可以让多个线程执行一个 I/O 绑定任务(它真正做的是使用 Azure Python 同时发出多个 Azure 请求SDK),我还有一个时差列表,例如 [1 秒、3 秒、10 秒、5 秒、...、7 秒],以便我在每个时差后再次执行该功能。
假设我想执行该函数并在 5 秒后再次执行它。第一次执行可能需要 5 秒以上才能完成,因为它必须等待它发出的请求完成。所以,我想在不同的进程中执行每个函数,以便函数的不同执行不会相互阻塞(即使它们在不使用不同进程的情况下不会相互阻塞,我只是不希望不同执行中的线程混合)。
我的代码是这样的:
import multiprocessing as mp
from time import sleep
def function(num_threads):
"""
This functions makes num_threads number of threads to make num_threads number of requests
"""
# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]
# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6]
processes = []
for i in range(len(times)):
p = mp.Process(target=function, args=[num_threads_list[i]])
p.start()
processes.append(p)
sleep(times[i])
for process in processes:
process.join()
我要问的问题:
列表的长度"times"在我的真实脚本中非常大(即 1000)。考虑到列表 "times" 中的时间差异,我估计函数 运行 最多有 5 个并发使用进程执行。我想知道每个进程是否在执行完函数后终止,所以实际上最多有 5 个进程 运行。或者,它是否仍然存在以便有 1000 个进程,考虑到我计算机的 CPU 个内核数量,这听起来很奇怪?
如果您认为有更好的方法来完成我上面解释的事情,请告诉我。
谢谢!
我从你的问题中得出的主要问题是同时有大量进程运行。
您可以通过维护一个最大长度的进程列表来防止这种情况发生。像这样。
import multiprocessing as mp
from time import sleep
from random import randint
def function(num_threads):
"""
This functions makes num_threads number of threads to make num_threads number of requests
"""
sleep(randint(3, 7))
# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]
# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
process_data_list = []
max_processes = 4
# =======================================================================================
def main():
times_index = 0
while times_index < len(times):
# cleanup stopped processes -------------------------------
cleanup_done = False
while not cleanup_done:
cleanup_done = True
# search stopped processes
for i, process_data in enumerate(process_data_list):
if not process_data[1].is_alive():
print(f'process {process_data[0]} finished')
# remove from processes
p = process_data_list.pop(i)
del p
# start new search
cleanup_done = False
break
# try start new process ---------------------------------
if len(process_data_list) < max_processes:
process = mp.Process(target=function, args=[num_threads_list[times_index]])
process.start()
process_data_list.append([times_index, process])
print(f'process {times_index} started')
times_index += 1
else:
sleep(0.1)
# wait for all processes to finish --------------------------------
while process_data_list:
for i, process_data in enumerate(process_data_list):
if not process_data[1].is_alive():
print(f'process {process_data[0]} finished')
# remove from processes
p = process_data_list.pop(i)
del p
# start new search
break
print('ALL DONE !!!!!!')
# =======================================================================================
if __name__ == '__main__':
main()
如您在结果中所见,它立即运行 max_processes。
process 0 started
process 1 started
process 2 started
process 3 started
process 3 finished
process 4 started
process 1 finished
process 5 started
process 0 finished
process 2 finished
process 5 finished
process 4 finished
ALL DONE !!!!!!
您还可以像下面的代码一样使用计时器来完成这项工作。
我自愿给线程 2 留出 15 秒,以便人们可以看到它在时间结束后有效地在最后一个位置结束。
此代码示例有两个主要功能。
第一个 your_process_here() 就像它的名字一样正在等待你自己的代码
第二个是组织线程切片以避免系统过载的管理器。
参数
max_process:脚本正在执行的进程总数
simultp:最大并发进程数
timegl:时间准则,它定义了自时间 parent 开始以来每个线程的等待时间。所以等待时间至少是指南中定义的时间(指的是parent的开始时间)。
换句话说,由于它的指导时间已经过去,考虑到允许的最大并发线程数,线程会尽快启动。
在这个例子中
max_process = 6
simultp = 3
timegl = [1, 15, 1, 0.22, 6, 0.5](只是为了解释,因为更合乎逻辑的是在那里有一个增加系列)
结果在shell
同时启动的进程:3
进程 n°2 处于活动状态,将在处理功能启动之前再等待 14.99 秒
进程 n°1 处于活动状态,将在处理功能启动之前再等待 0.98 秒
进程 n°3 处于活动状态,将在处理功能启动之前再等待 0.98 秒
----进程n°1结束----
----进程n°3结束----
同时启动的进程:3
进程 n°5 处于活动状态,将在处理功能启动之前再等待 2.88 秒
进程 n°4 处于活动状态,现在将开始
----进程n°4结束----
----进程n°5结束----
同时启动的进程:2
进程 n°6 处于活动状态,现在将开始
----进程n°6结束----
----进程n°2结束----
代码
import multiprocessing as mp
from threading import Timer
import time
def your_process_here(starttime, pnum, timegl):
# Delay since the parent thread starts
delay_since_pstart = time.time() - starttime
# Time to sleep in order to follow the most possible the time guideline
diff = timegl[pnum-1]- delay_since_pstart
if diff > 0: # if time ellapsed since Parent starts < guideline time
print('process n°{0} is active and will wait {1} seconds more before treatment function starts'\
.format(pnum, round(diff, 2)))
time.sleep(diff) # wait for X more seconds
else:
print('process n°{0} is active and will start now'.format(pnum))
########################################################
## PUT THE CODE AFTER SLEEP() TO START CODE WITH A DELAY
## if pnum == 1:
## function1()
## elif pnum == 2:
## function2()
## ...
print('---- process n°{0} ended ----'.format(pnum))
def process_manager(max_process, simultp, timegl, starttime=0, pnum=1, launchp=[]):
# While your number of simultaneous current processes is less than simultp and
# the historical number of processes is less than max_process
while len(mp.active_children()) < simultp and len(launchp) < max_process:
# Incrementation of the process number
pnum = len(launchp) + 1
# Start a new process
mp.Process(target=your_process_here, args=(starttime, pnum, timegl)).start()
# Historical of all launched unique processes
launchp = list(set(launchp + mp.active_children()))
# ...
####### THESE 2 FOLLOWING LINES ARE TO DELETE IN OPERATIONAL CODE ############
print('simultaneously launched processes : ', len(mp.active_children()))
time.sleep(3) # optionnal : This a break of 3 seconds before the next slice of process to be treated
##############################################################################
if pnum < max_process:
delay_repeat = 0.1 # 100 ms
# If all the processes have not been launched renew the operation
Timer(delay_repeat, process_manager, (max_process, simultp, timegl, starttime, pnum, launchp)).start()
if __name__ == '__main__':
max_process = 6 # maximum of processes
simultp = 3 # maximum of simultaneous processes to save resources
timegl = [1, 15, 1, 0.22, 6, 0.5] # Time guideline
starttime = time.time()
process_manager(max_process, simultp, timegl, starttime)
我是 Python 的初学者,所以如果您能帮助我提供清晰易懂的解释,我将不胜感激。
在我的 Python 脚本中,我有一个函数可以让多个线程执行一个 I/O 绑定任务(它真正做的是使用 Azure Python 同时发出多个 Azure 请求SDK),我还有一个时差列表,例如 [1 秒、3 秒、10 秒、5 秒、...、7 秒],以便我在每个时差后再次执行该功能。
假设我想执行该函数并在 5 秒后再次执行它。第一次执行可能需要 5 秒以上才能完成,因为它必须等待它发出的请求完成。所以,我想在不同的进程中执行每个函数,以便函数的不同执行不会相互阻塞(即使它们在不使用不同进程的情况下不会相互阻塞,我只是不希望不同执行中的线程混合)。
我的代码是这样的:
import multiprocessing as mp
from time import sleep
def function(num_threads):
"""
This functions makes num_threads number of threads to make num_threads number of requests
"""
# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]
# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6]
processes = []
for i in range(len(times)):
p = mp.Process(target=function, args=[num_threads_list[i]])
p.start()
processes.append(p)
sleep(times[i])
for process in processes:
process.join()
我要问的问题:
列表的长度"times"在我的真实脚本中非常大(即 1000)。考虑到列表 "times" 中的时间差异,我估计函数 运行 最多有 5 个并发使用进程执行。我想知道每个进程是否在执行完函数后终止,所以实际上最多有 5 个进程 运行。或者,它是否仍然存在以便有 1000 个进程,考虑到我计算机的 CPU 个内核数量,这听起来很奇怪?
如果您认为有更好的方法来完成我上面解释的事情,请告诉我。
谢谢!
我从你的问题中得出的主要问题是同时有大量进程运行。
您可以通过维护一个最大长度的进程列表来防止这种情况发生。像这样。
import multiprocessing as mp
from time import sleep
from random import randint
def function(num_threads):
"""
This functions makes num_threads number of threads to make num_threads number of requests
"""
sleep(randint(3, 7))
# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]
# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
process_data_list = []
max_processes = 4
# =======================================================================================
def main():
times_index = 0
while times_index < len(times):
# cleanup stopped processes -------------------------------
cleanup_done = False
while not cleanup_done:
cleanup_done = True
# search stopped processes
for i, process_data in enumerate(process_data_list):
if not process_data[1].is_alive():
print(f'process {process_data[0]} finished')
# remove from processes
p = process_data_list.pop(i)
del p
# start new search
cleanup_done = False
break
# try start new process ---------------------------------
if len(process_data_list) < max_processes:
process = mp.Process(target=function, args=[num_threads_list[times_index]])
process.start()
process_data_list.append([times_index, process])
print(f'process {times_index} started')
times_index += 1
else:
sleep(0.1)
# wait for all processes to finish --------------------------------
while process_data_list:
for i, process_data in enumerate(process_data_list):
if not process_data[1].is_alive():
print(f'process {process_data[0]} finished')
# remove from processes
p = process_data_list.pop(i)
del p
# start new search
break
print('ALL DONE !!!!!!')
# =======================================================================================
if __name__ == '__main__':
main()
如您在结果中所见,它立即运行 max_processes。
process 0 started
process 1 started
process 2 started
process 3 started
process 3 finished
process 4 started
process 1 finished
process 5 started
process 0 finished
process 2 finished
process 5 finished
process 4 finished
ALL DONE !!!!!!
您还可以像下面的代码一样使用计时器来完成这项工作。 我自愿给线程 2 留出 15 秒,以便人们可以看到它在时间结束后有效地在最后一个位置结束。
此代码示例有两个主要功能。 第一个 your_process_here() 就像它的名字一样正在等待你自己的代码 第二个是组织线程切片以避免系统过载的管理器。
参数
max_process:脚本正在执行的进程总数
simultp:最大并发进程数
timegl:时间准则,它定义了自时间 parent 开始以来每个线程的等待时间。所以等待时间至少是指南中定义的时间(指的是parent的开始时间)。
换句话说,由于它的指导时间已经过去,考虑到允许的最大并发线程数,线程会尽快启动。
在这个例子中
max_process = 6
simultp = 3
timegl = [1, 15, 1, 0.22, 6, 0.5](只是为了解释,因为更合乎逻辑的是在那里有一个增加系列)
结果在shell
同时启动的进程:3
进程 n°2 处于活动状态,将在处理功能启动之前再等待 14.99 秒
进程 n°1 处于活动状态,将在处理功能启动之前再等待 0.98 秒
进程 n°3 处于活动状态,将在处理功能启动之前再等待 0.98 秒
----进程n°1结束----
----进程n°3结束----
同时启动的进程:3
进程 n°5 处于活动状态,将在处理功能启动之前再等待 2.88 秒
进程 n°4 处于活动状态,现在将开始
----进程n°4结束----
----进程n°5结束----
同时启动的进程:2
进程 n°6 处于活动状态,现在将开始
----进程n°6结束----
----进程n°2结束----
代码
import multiprocessing as mp
from threading import Timer
import time
def your_process_here(starttime, pnum, timegl):
# Delay since the parent thread starts
delay_since_pstart = time.time() - starttime
# Time to sleep in order to follow the most possible the time guideline
diff = timegl[pnum-1]- delay_since_pstart
if diff > 0: # if time ellapsed since Parent starts < guideline time
print('process n°{0} is active and will wait {1} seconds more before treatment function starts'\
.format(pnum, round(diff, 2)))
time.sleep(diff) # wait for X more seconds
else:
print('process n°{0} is active and will start now'.format(pnum))
########################################################
## PUT THE CODE AFTER SLEEP() TO START CODE WITH A DELAY
## if pnum == 1:
## function1()
## elif pnum == 2:
## function2()
## ...
print('---- process n°{0} ended ----'.format(pnum))
def process_manager(max_process, simultp, timegl, starttime=0, pnum=1, launchp=[]):
# While your number of simultaneous current processes is less than simultp and
# the historical number of processes is less than max_process
while len(mp.active_children()) < simultp and len(launchp) < max_process:
# Incrementation of the process number
pnum = len(launchp) + 1
# Start a new process
mp.Process(target=your_process_here, args=(starttime, pnum, timegl)).start()
# Historical of all launched unique processes
launchp = list(set(launchp + mp.active_children()))
# ...
####### THESE 2 FOLLOWING LINES ARE TO DELETE IN OPERATIONAL CODE ############
print('simultaneously launched processes : ', len(mp.active_children()))
time.sleep(3) # optionnal : This a break of 3 seconds before the next slice of process to be treated
##############################################################################
if pnum < max_process:
delay_repeat = 0.1 # 100 ms
# If all the processes have not been launched renew the operation
Timer(delay_repeat, process_manager, (max_process, simultp, timegl, starttime, pnum, launchp)).start()
if __name__ == '__main__':
max_process = 6 # maximum of processes
simultp = 3 # maximum of simultaneous processes to save resources
timegl = [1, 15, 1, 0.22, 6, 0.5] # Time guideline
starttime = time.time()
process_manager(max_process, simultp, timegl, starttime)