Python 多处理代码运行良好,但不会终止
Python multiprocessing code runs fine, but does not terminate
我有这段代码(很抱歉,它几乎是我工作代码的精确复制粘贴。我不知道问题出在哪里,因此我把它全部放在这里):
def init(Q):
"""Serves to initialize the queue across all child processes"""
global q
q = Q
def queue_manager(q):
"""Listens on the queue, and writes pushed data to file"""
while True:
data = q.get()
if data is None:
break
key, preds = data
with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store:
out_store.append(key, preds)
def writer(message):
"""Pushes messages to queue"""
q.put(message)
def reader(key):
"""Reads data from store, selects required days, processes it"""
try:
# Read the data
with pd.HDFStore(hdf_in, mode='r') as in_store:
df = in_store[key]
except KeyError as ke:
# Almost guaranteed to not happen
return (key, pd.DataFrame())
else:
# Executes only if exception is not raised
fit_df = df[(df.index >= '2016-09-11') & \
(df.index < '2016-09-25') & \
(df.index.dayofweek < 5)].copy()
pre_df = df[(df.index >= '2016-09-18') & \
(df.index < '2016-10-2') & \
(df.index.dayofweek < 5)].copy()
del df
# model_wrapper below is a custom function in another module.
# It works fine.
models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df)
if preds is not None:
writer((key, preds))
del preds
return (key, models)
def main():
sensors = pd.read_csv('sens_metadata.csv', index_col=[0])
nprocs = int(cpu_count() - 0)
maxproc = 10
q = Queue()
t = Thread(target=queue_manager, args=(q,))
print("Starting process at\t{}".format(dt.now().time()))
sys.stdout.flush()
t.start()
with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init,
initargs=(q,)) as p:
models = p.map(reader, sensors.index.tolist(), 1)
print("Processing done at\t{}".format(dt.now().time()))
print("\nJoining Thread, and finishing writing predictions")
sys.stdout.flush()
q.put(None)
t.join()
print("Thread joined successfully at\t{}".format(dt.now().time()))
print("\nConcatenating models and serializing to pickle")
sys.stdout.flush()
pd.concat(dict(models)).to_pickle(path + 'models.pickle')
print("Pickled successfully at\t{}".format(dt.now().time()))
if __name__ == '__main__':
main()
这段代码的行为就像一次严重偏向的抛硬币。大多数时候它不起作用,有时它起作用。当它运行时,我知道完成 运行 整个数据(所有 keys
)大约需要 2.5 小时。 10 次运行中有 9 次,它将处理所有数据,我在 hdf_out
文件中看到数据,但多处理池没有加入。所有子进程都处于活动状态,但不做任何工作。我只是不明白为什么程序会这样挂起。
发生这种情况时,我看不到 "Processing done at ..."
和 "Joining Thread, ..."
消息。另外,如果我给它较小的数据集,它就会完成。如果我排除 preds
的计算,它就会结束。我不能排除 models
的计算而不进行大量修改,这将不利于项目的其余部分。
我不知道为什么会这样。我正在使用 Linux (Kubuntu 16.04).
显然删除 maxtaskperchild
kwag 可以解决问题。为什么是我不清楚的事情。我想这与 fork 进程(Linux 上的默认设置)和 spawn 进程(Windows 上的唯一选项)之间的区别有关。
使用 fork 进程 maxtaskperchild
显然不是必需的,因为没有它性能会更好。我注意到通过删除 maxtaskperchild
改进了内存使用。内存不会被子进程占用,而是与父进程共享。然而,在我不得不使用 Windows 的时候,maxtaskperchild
是防止子进程膨胀的关键方法,尤其是当 运行 具有长任务列表的内存密集型任务时。
知道情况更好的人,请随时编辑此答案。
我有这段代码(很抱歉,它几乎是我工作代码的精确复制粘贴。我不知道问题出在哪里,因此我把它全部放在这里):
def init(Q):
"""Serves to initialize the queue across all child processes"""
global q
q = Q
def queue_manager(q):
"""Listens on the queue, and writes pushed data to file"""
while True:
data = q.get()
if data is None:
break
key, preds = data
with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store:
out_store.append(key, preds)
def writer(message):
"""Pushes messages to queue"""
q.put(message)
def reader(key):
"""Reads data from store, selects required days, processes it"""
try:
# Read the data
with pd.HDFStore(hdf_in, mode='r') as in_store:
df = in_store[key]
except KeyError as ke:
# Almost guaranteed to not happen
return (key, pd.DataFrame())
else:
# Executes only if exception is not raised
fit_df = df[(df.index >= '2016-09-11') & \
(df.index < '2016-09-25') & \
(df.index.dayofweek < 5)].copy()
pre_df = df[(df.index >= '2016-09-18') & \
(df.index < '2016-10-2') & \
(df.index.dayofweek < 5)].copy()
del df
# model_wrapper below is a custom function in another module.
# It works fine.
models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df)
if preds is not None:
writer((key, preds))
del preds
return (key, models)
def main():
sensors = pd.read_csv('sens_metadata.csv', index_col=[0])
nprocs = int(cpu_count() - 0)
maxproc = 10
q = Queue()
t = Thread(target=queue_manager, args=(q,))
print("Starting process at\t{}".format(dt.now().time()))
sys.stdout.flush()
t.start()
with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init,
initargs=(q,)) as p:
models = p.map(reader, sensors.index.tolist(), 1)
print("Processing done at\t{}".format(dt.now().time()))
print("\nJoining Thread, and finishing writing predictions")
sys.stdout.flush()
q.put(None)
t.join()
print("Thread joined successfully at\t{}".format(dt.now().time()))
print("\nConcatenating models and serializing to pickle")
sys.stdout.flush()
pd.concat(dict(models)).to_pickle(path + 'models.pickle')
print("Pickled successfully at\t{}".format(dt.now().time()))
if __name__ == '__main__':
main()
这段代码的行为就像一次严重偏向的抛硬币。大多数时候它不起作用,有时它起作用。当它运行时,我知道完成 运行 整个数据(所有 keys
)大约需要 2.5 小时。 10 次运行中有 9 次,它将处理所有数据,我在 hdf_out
文件中看到数据,但多处理池没有加入。所有子进程都处于活动状态,但不做任何工作。我只是不明白为什么程序会这样挂起。
发生这种情况时,我看不到 "Processing done at ..."
和 "Joining Thread, ..."
消息。另外,如果我给它较小的数据集,它就会完成。如果我排除 preds
的计算,它就会结束。我不能排除 models
的计算而不进行大量修改,这将不利于项目的其余部分。
我不知道为什么会这样。我正在使用 Linux (Kubuntu 16.04).
显然删除 maxtaskperchild
kwag 可以解决问题。为什么是我不清楚的事情。我想这与 fork 进程(Linux 上的默认设置)和 spawn 进程(Windows 上的唯一选项)之间的区别有关。
使用 fork 进程 maxtaskperchild
显然不是必需的,因为没有它性能会更好。我注意到通过删除 maxtaskperchild
改进了内存使用。内存不会被子进程占用,而是与父进程共享。然而,在我不得不使用 Windows 的时候,maxtaskperchild
是防止子进程膨胀的关键方法,尤其是当 运行 具有长任务列表的内存密集型任务时。
知道情况更好的人,请随时编辑此答案。