Python - 多处理 - CPU 的更多作业。

Python - Multiprocessing - More jobs that cpus.

我知道我可以使用池 class 并且可能得到我需要的东西,但我想更好地控制我的问题。我的工作比处理者多,所以我不希望他们一次运行。

例如:

from multiprocessing import Process,cpu_count
for dir_name in directories:
    src_dir = os.path.join(top_level,dir_name)
    dst_dir = src_dir.replace(args.src_dir,args.target_dir)
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
    p.start()

但是,如果我有超过 16 个目录,那么我将启动比处理器更多的作业。这是我的解决方案,真的很 hack。

from multiprocessing import Process,cpu_count
jobs = []
for dir_name in directories:
    src_dir = os.path.join(top_level,dir_name)
    dst_dir = src_dir.replace(args.src_dir,args.target_dir)
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
    jobs.append(p)

alive_jobs = []
while jobs:
    if len(alive_jobs) >= cpu_count():
        time.sleep(5)
        print alive_jobs
        for aj in alive_jobs:
            if aj.is_alive():
                continue
            else:
                print "job {} removed".format(aj)
                alive_jobs.remove(aj)

        continue

    for job in jobs:
        if job.is_alive():
            continue
        job.start()
        alive_jobs.append(job)
        print alive_jobs
        jobs.remove(job)
        if len(alive_jobs) >= cpu_count():
            break

使用内置工具是否有更好的解决方案?

我想在这里分享我的想法:创建数量等于 cpu_count() 的进程,使用队列存储所有目录,并将队列传递给您的 transfer_directory 方法,采用 dir_name 一旦进程完成其工作,就从队列中退出。草稿如下所示:

NUM_OF_PROCESSES = multiprocessing.cpu_count()
TIME_OUT_IN_SECONDS = 60

for dir_name in directories:
    my_queue.put(dir_name)

# creates processes that equals to number of CPU 
processes = [multiprocessing.Process(target=transfer_directory, args=(my_queue,)) for x in range(NUM_OF_PROCESSES)]

# starts processes
for p in processes:
    p.start()

# blocks the calling thread
for p in processes:
    p.join()



def transfer_directory(my_queue):
    """processes element of directory queue if queue is not empty"""
    while my_queue is not empty:
        dir_name = my_queue.get(timeout=TIME_OUT_IN_SECONDS)
        src_dir = os.path.join(top_level,dir_name)
        dst_dir = src_dir.replace(args.src_dir,args.target_dir)

编辑: 读取大文件也很高效。 一段时间以来,我一直在努力如何使用 multiprocessing 读取一个巨大的文件(超过 1000 万行),最后我使用一个进程启动 producer(a_queue) 来读取行并将行放入队列,然后启动多个consumers(a_queue)a_queue取线,做耗时的工作,对我来说很正常。