如何对多处理池而不是进程做同样的事情?

How to do the same with multiprocessing pool and not process?

在这里,我试图获取最终词典,其中 key 应为 file namevalue 应为该文件中的 number of columns。我可以使用多处理进程 class 来完成。但这里的主要问题是我必须手动将所有文件分成块以进行处理。我想使用台球,因为我不想做这个练习。如何同时使用池和管理器来获得相同的结果?

import pandas as pd
import datetime
import os
import multiprocessing
import time
all_files = os.listdir('E:\2nd Set\')

def rule(files,main_list):
    for file in files:
        df = pd.read_csv('E:\2nd Set\'+file)
        main_list[file] = df.shape[0]

if __name__=='__main__':
    mgr = multiprocessing.Manager()
    main_list = mgr.dict()
    p1 = multiprocessing.Process(target=rule,args=(all_files[:800],main_list))
    p2 = multiprocessing.Process(target=rule,args=(all_files[800:],main_list))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(main_list)

以下应该有效。正如我在其他地方提到的,这可能不是多处理或多线程的最佳选择,除非您有固态驱动器。我已将池大小设置为 2,这实际上等同于使用 Process 个实例的原始程序。您可以尝试增加池大小并查看它如何影响性能(请参阅我最后关于在连续运行之间重新启动以清除磁盘缓存以获得准确时间的说明),但我怀疑它会损害性能,因为您的磁盘只能维持一定量的吞吐量并且由于同时读取文件而让多个进程或线程来回移动磁盘头只会开始损害性能,除非你有固态驱动器。我还包括了一个多线程版本。

import pandas as pd
import datetime
import os
import multiprocessing
import time
from functools import partial


DIR = 'E:\2nd Set\'

def rule(file, main_list):
    df = pd.read_csv(DIR + file)
    main_list[file] = df.shape[0]

if __name__=='__main__':
    start_time = time.time()
    all_files = os.listdir(DIR)
    # Move the following statement here so that it is not
    # needlessly computed by each subprocess if this is being
    # run under Windows:
    all_files = os.listdir(DIR)
    mgr = multiprocessing.Manager()
    main_list = mgr.dict()
    # Too many threads or processes can really hurt you because of
    # disk contention. The disk can only support so much concurrent
    # I/O
    MAX_PROCESSES = 2 # The equivalent of the original program
    pool = multiprocessing.Pool(MAX_PROCESSES)
    worker = partial(rule, main_list=main_list)
    pool.map(worker, all_files)
    pool.close()
    pool.join()
    elapsed = time.time() - start_time

    print(main_list)
    print(elapsed)

您可以尝试使用多线程执行此操作,看看性能是否有所提高。但是,您需要先重新启动计算机以确保在比较多处理与多线程的时间或您对代码所做的任何连续两次运行之前刷新所有磁盘缓存.这里是多线程版本,可以使用正则字典:

import pandas as pd
import datetime
import os
from mulitprocessing.pool import ThreadPool
import time
from functools import partial


DIR = 'E:\2nd Set\'

def rule(file, main_list):
    df = pd.read_csv(DIR + file)
    main_list[file] = df.shape[0]

if __name__=='__main__':
    start_time = time.time()
    all_files = os.listdir(DIR)
    main_list = {}
    # Too many threads or processes can really hurt you because of
    # disk contention. The disk can only support so much concurrent
    # I/O
    MAX_THREADS = 2 # The equivalent of the original program
    pool = ThreadPool(MAX_THREADS)
    worker = partial(rule, main_list=main_list)
    pool.map(worker, all_files)
    pool.close()
    pool.join()
    elapsed = time.time() - start_time

    print(main_list)
    print(elapsed)