运行 在 Python 中具有不同数据集的多核模型
Running models on multiple cores with different data sets in Python
我有一个包含多个数据集的文件夹,我想 运行 在这些数据集上建立模型并将负载分布到多个内核,希望能增加数据处理的总体 运行 时间.
我的电脑有8个核心。这是我在下面的第一次尝试,它只是一个草图,但使用 htop
,我可以看到这项工作只使用了一个核心。多核新手在此
import pandas as pd
import multiprocessing
import os
from library_example import model_example
def worker(file_):
to_save = pd.Series()
with open(file_,'r') as f_open:
data = f_open.read()
# Run model
model_results = model_example(file_)
# Save results in DataFrame
to_save.to_csv(file_[:-4]+ "_results.csv", model_results )
file_location_ = "/home/datafiles/"
if __name__ == '__main__':
for filename in os.listdir(file_location_):
p = multiprocessing.Process(target=worker, args=(file_location_ + filename,))
p.start()
p.join()
尝试移出 p.join()
。这将 等待 流程完成,当您开始流程(即 start
)然后等待每个流程(即 join
).相反,你可以尝试这样的事情:
# construct the workers
workers = [multiprocessing.Process(target=worker, args=(file_location_ + filename,)) for filename in os.listdir(file_location_)]
# start them
for proc in workers:
proc.start()
# now we wait for them
for proc in workers:
proc.join()
(我没有在你的代码中尝试 运行 这个,但类似的东西应该可以工作。)
EDIT 如果您想限制 workers/processes 的数量,那么我建议只使用 Pool
。您可以指定要使用的进程数,然后 map(..)
这些进程的参数。示例:
# construct a pool of workers
pool = multiprocessing.Pool(6)
pool.map(worker, [file_location_ + filename for filename in os.listdir(file_location_)])
pool.close()
我有一个包含多个数据集的文件夹,我想 运行 在这些数据集上建立模型并将负载分布到多个内核,希望能增加数据处理的总体 运行 时间.
我的电脑有8个核心。这是我在下面的第一次尝试,它只是一个草图,但使用 htop
,我可以看到这项工作只使用了一个核心。多核新手在此
import pandas as pd
import multiprocessing
import os
from library_example import model_example
def worker(file_):
to_save = pd.Series()
with open(file_,'r') as f_open:
data = f_open.read()
# Run model
model_results = model_example(file_)
# Save results in DataFrame
to_save.to_csv(file_[:-4]+ "_results.csv", model_results )
file_location_ = "/home/datafiles/"
if __name__ == '__main__':
for filename in os.listdir(file_location_):
p = multiprocessing.Process(target=worker, args=(file_location_ + filename,))
p.start()
p.join()
尝试移出 p.join()
。这将 等待 流程完成,当您开始流程(即 start
)然后等待每个流程(即 join
).相反,你可以尝试这样的事情:
# construct the workers
workers = [multiprocessing.Process(target=worker, args=(file_location_ + filename,)) for filename in os.listdir(file_location_)]
# start them
for proc in workers:
proc.start()
# now we wait for them
for proc in workers:
proc.join()
(我没有在你的代码中尝试 运行 这个,但类似的东西应该可以工作。)
EDIT 如果您想限制 workers/processes 的数量,那么我建议只使用 Pool
。您可以指定要使用的进程数,然后 map(..)
这些进程的参数。示例:
# construct a pool of workers
pool = multiprocessing.Pool(6)
pool.map(worker, [file_location_ + filename for filename in os.listdir(file_location_)])
pool.close()