Python 多处理 - 几秒钟后开始下一个进程

Python multiprocessing - starting next process after several seconds

我是 Python 的初学者,所以如果您能帮助我提供清晰易懂的解释,我将不胜感激。

在我的 Python 脚本中,我有一个函数可以让多个线程执行一个 I/O 绑定任务(它真正做的是使用 Azure Python 同时发出多个 Azure 请求SDK),我还有一个时差列表,例如 [1 秒、3 秒、10 秒、5 秒、...、7 秒],以便我在每个时差后再次执行该功能。

假设我想执行该函数并在 5 秒后再次执行它。第一次执行可能需要 5 秒以上才能完成,因为它必须等待它发出的请求完成。所以,我想在不同的进程中执行每个函数,以便函数的不同执行不会相互阻塞(即使它们在不使用不同进程的情况下不会相互阻塞,我只是不希望不同执行中的线程混合)。

我的代码是这样的:

import multiprocessing as mp
from time import sleep

def function(num_threads):
    """
    This functions makes num_threads number of threads to make num_threads number of requests 
    """

# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]

# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6]

processes = []

for i in range(len(times)):
    p = mp.Process(target=function, args=[num_threads_list[i]])
    p.start()
    processes.append(p)

    sleep(times[i])

for process in processes:
    process.join()

我要问的问题:

  1. 列表的长度"times"在我的真实脚本中非常大(即 1000)。考虑到列表 "times" 中的时间差异,我估计函数 运行 最多有 5 个并发使用进程执行。我想知道每个进程是否在执行完函数后终止,所以实际上最多有 5 个进程 运行。或者,它是否仍然存在以便有 1000 个进程,考虑到我计算机的 CPU 个内核数量,这听起来很奇怪?

  2. 如果您认为有更好的方法来完成我上面解释的事情,请告诉我。

谢谢!

我从你的问题中得出的主要问题是同时有大量进程运行。

您可以通过维护一个最大长度的进程列表来防止这种情况发生。像这样。

import multiprocessing as mp
from time import sleep
from random import randint


def function(num_threads):
    """
    This functions makes num_threads number of threads to make num_threads number of requests
    """
    sleep(randint(3, 7))


# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]

# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

process_data_list = []
max_processes = 4


# =======================================================================================
def main():
    times_index = 0
    while times_index < len(times):

        # cleanup stopped processes -------------------------------
        cleanup_done = False
        while not cleanup_done:
            cleanup_done = True
            # search stopped processes
            for i, process_data in enumerate(process_data_list):
                if not process_data[1].is_alive():
                    print(f'process {process_data[0]} finished')
                    # remove from processes
                    p = process_data_list.pop(i)
                    del p
                    # start new search
                    cleanup_done = False
                    break

        # try start new process ---------------------------------
        if len(process_data_list) < max_processes:
            process = mp.Process(target=function, args=[num_threads_list[times_index]])
            process.start()
            process_data_list.append([times_index, process])
            print(f'process {times_index} started')
            times_index += 1
        else:
            sleep(0.1)

    # wait for all processes to finish --------------------------------
    while process_data_list:
        for i, process_data in enumerate(process_data_list):
            if not process_data[1].is_alive():
                print(f'process {process_data[0]} finished')
                # remove from processes
                p = process_data_list.pop(i)
                del p
                # start new search
                break

    print('ALL DONE !!!!!!')


# =======================================================================================
if __name__ == '__main__':
    main()

如您在结果中所见,它立即运行 max_processes。

process 0 started
process 1 started
process 2 started
process 3 started
process 3 finished
process 4 started
process 1 finished
process 5 started
process 0 finished
process 2 finished
process 5 finished
process 4 finished
ALL DONE !!!!!!

您还可以像下面的代码一样使用计时器来完成这项工作。 我自愿给线程 2 留出 15 秒,以便人们可以看到它在时间结束后有效地​​在最后一个位置结束。

此代码示例有两个主要功能。 第一个 your_process_here() 就像它的名字一样正在等待你自己的代码 第二个是组织线程切片以避免系统过载的管理器。

参数

max_process:脚本正在执行的进程总数

simultp:最大并发进程数

timegl:时间准则,它定义了自时间 parent 开始以来每个线程的等待时间。所以等待时间至少是指南中定义的时间(指的是parent的开始时间)。

换句话说,由于它的指导时间已经过去,考虑到允许的最大并发线程数,线程会尽快启动。

在这个例子中

max_process = 6

simultp = 3

timegl = [1, 15, 1, 0.22, 6, 0.5](只是为了解释,因为更合乎逻辑的是在那里有一个增加系列)

结果在shell

同时启动的进程:3

进程 n°2 处于活动状态,将在处理功能启动之前再等待 14.99 秒

进程 n°1 处于活动状态,将在处理功能启动之前再等待 0.98 秒

进程 n°3 处于活动状态,将在处理功能启动之前再等待 0.98 秒

----进程n°1结束----

----进程n°3结束----

同时启动的进程:3

进程 n°5 处于活动状态,将在处理功能启动之前再等待 2.88 秒

进程 n°4 处于活动状态,现在将开始

----进程n°4结束----

----进程n°5结束----

同时启动的进程:2

进程 n°6 处于活动状态,现在将开始

----进程n°6结束----

----进程n°2结束----

代码

import multiprocessing as mp
from threading import Timer
import time


def your_process_here(starttime, pnum, timegl):
    # Delay since the parent thread starts
    delay_since_pstart = time.time() - starttime
    # Time to sleep in order to follow the most possible the time guideline 
    diff = timegl[pnum-1]- delay_since_pstart
    if diff > 0: # if time ellapsed since Parent starts < guideline time
        print('process n°{0} is active and will wait {1} seconds more before treatment function starts'\
              .format(pnum, round(diff, 2)))        
        time.sleep(diff) # wait for X more seconds
    else:
        print('process n°{0} is active and will start now'.format(pnum))
    ########################################################
    ## PUT THE CODE AFTER SLEEP() TO START CODE WITH A DELAY
    ## if pnum == 1:
    ##     function1()  
    ## elif pnum == 2:
    ##     function2()
    ## ...
    print('---- process n°{0} ended ----'.format(pnum))

def process_manager(max_process, simultp, timegl, starttime=0, pnum=1, launchp=[]):
    # While your number of simultaneous current processes is less than simultp and
    # the historical number of processes is less than max_process
    while len(mp.active_children()) < simultp and len(launchp) < max_process:
        # Incrementation of the process number
        pnum = len(launchp) + 1
        # Start a new process
        mp.Process(target=your_process_here, args=(starttime, pnum, timegl)).start()
        # Historical of all launched unique processes
        launchp = list(set(launchp + mp.active_children()))        
    # ...
    ####### THESE 2 FOLLOWING LINES ARE TO DELETE IN OPERATIONAL CODE ############
    print('simultaneously launched processes : ', len(mp.active_children()))        
    time.sleep(3) # optionnal : This a break of 3 seconds before the next slice of process to be treated
    ##############################################################################
    if pnum < max_process:
        delay_repeat = 0.1 # 100 ms
        # If all the processes have not been launched renew the operation
        Timer(delay_repeat, process_manager, (max_process, simultp, timegl, starttime, pnum, launchp)).start()        

if __name__ == '__main__':
    max_process = 6 # maximum of processes
    simultp = 3 # maximum of simultaneous processes to save resources
    timegl = [1, 15, 1, 0.22, 6, 0.5] # Time guideline
    starttime = time.time()
    process_manager(max_process, simultp, timegl, starttime)