Python : 具有巨大数据帧的多处理非常慢
Python : Multiprocessing with a huge dataframe is pretty slow
我正在做粒子物理实验的数据分析。我有一个包含数百万行的巨大数据框。以下是数据帧结构的示例:
n_events event_number channel t v
0 200 0 1.0 [0.0, 0.292229, 0.44511900000000004, 0.686493,... [0.007145071463695994, 0.006022061677328001, 0...
1 200 0 2.0 [0.0, 0.28361, 0.43580699999999994, 0.70387699... [-0.004745911500159997, -0.004473575244724004,...
2 200 0 3.0 [0.0, 0.290339, 0.44740100000000005, 0.700787,... [-0.0016976513865119857, -0.000588632718491007...
3 200 0 4.0 [0.0, 0.299564, 0.435033, 0.701605, 0.7830996,... [0.000518971074000005, 0.0015031308265279996, ...
4 200 0 5.0 [0.0, 0.295462, 0.43185, 0.689991, 0.804407, 1... [-0.012224856444671991, -0.012481382646527987,...
... ... ... ... ... ...
7195 200 199 29.0 [0.0, 0.273977, 0.394305, 0.64364, 0.781702, 1... [-0.006007219163999997, -0.006066839886615997,...
7196 200 199 30.0 [0.0, 0.296464, 0.408051, 0.660629, 0.797999, ... [-0.0071352449638400085, -0.007720353590299007...
7197 200 199 31.0 [0.0, 0.271873, 0.39696299999999995, 0.661967,... [0.0007936306725499976, 0.0006720786866000081,...
7198 200 199 32.0 [0.0, 0.274194, 0.390268, 0.652645, 0.794755, ... [0.008792689244320001, 0.008201458288511989, 0...
7199 200 199 36.0 [0.0, 0.281648, 0.402901, 0.656489, 0.793202, ... [0.9035749247410491, 0.899703268236, 0.8972429...
n_events event_number channel t v
0 200 0 1.0 [0.0, 0.292229, 0.44511900000000004, 0.686493,... [0.007145071463695994, 0.006022061677328001, 0...
1 200 0 2.0 [0.0, 0.28361, 0.43580699999999994, 0.70387699... [-0.004745911500159997, -0.004473575244724004,...
2 200 0 3.0 [0.0, 0.290339, 0.44740100000000005, 0.700787,... [-0.0016976513865119857, -0.000588632718491007...
3 200 0 4.0 [0.0, 0.299564, 0.435033, 0.701605, 0.7830996,... [0.000518971074000005, 0.0015031308265279996, ...
4 200 0 5.0 [0.0, 0.295462, 0.43185, 0.689991, 0.804407, 1... [-0.012224856444671991, -0.012481382646527987,...
5 200 0 6.0 [0.0, 0.304114, 0.431968, 0.6919690000000001, ... [-0.006504729964460993, -0.006973892963776013,...
6 200 0 7.0 [0.0, 0.296276, 0.435403, 0.694577, 0.801506, ... [0.0076502175278079804, 0.007291070848924005, ...
7 200 0 8.0 [0.0, 0.302246, 0.43909600000000004, 0.707817,... [0.005709271868173, 0.005525502225727999, 0.00...
8 200 0 33.0 [0.0, 0.312825, 0.450079, 0.7171700000000001, ... [0.6907829934142, 0.65032811641518, 0.63168262...
9 200 0 9.0 [0.0, 0.162366, 0.41691300000000003, 0.546435,... [-0.004370937556799992, -0.004156079620345007,...
10 200 0 10.0 [0.0, 0.168124, 0.430847, 0.551334, 0.810323, ... [0.005713839258880994, 0.005151084024971997, 0...
11 200 0 11.0 [0.0, 0.166088, 0.43300900000000003, 0.5373290... [-0.006496423078124987, -0.0059013241952349995...
12 200 0 12.0 [0.0, 0.133375, 0.4204, 0.521838, 0.803831, 0.... [-0.0029161667740250116, -0.004175291108256005...
13 200 0 13.0 [0.0, 0.126736, 0.425956, 0.530385, 0.811472, ... [-0.0045837650761579906, -0.003585277339199997...
14 200 0 14.0 [0.0, 0.134787, 0.419325, 0.527267, 0.808555, ... [-0.001530516654803007, -0.0011034030005499943...
15 200 0 15.0 [0.0, 0.1509, 0.444928, 0.5294964, 0.8145954, ... [-0.00559784605927201, -0.004645566291229007, ...
16 200 0 16.0 [0.0, 0.134896, 0.422334, 0.538761, 0.81445200... [0.009337214123531992, 0.009593533354463982, 0...
17 200 0 34.0 [0.0, 0.13378, 0.409487, 0.528365, 0.791995, 0... [0.809813636495625, 0.7967833234162501, 0.7681...
18 200 0 17.0 [0.0, 0.240012, 0.402098, 0.6908080000000001, ... [-0.002736104546672001, -0.0034600646495999824...
19 200 0 18.0 [0.0, 0.256932, 0.40772600000000003, 0.694829,... [-0.004761098550782997, -0.004992603885120003,...
20 200 0 19.0 [0.0, 0.260573, 0.411907, 0.7032830000000001, ... [-0.0020984008527860022, -0.001233839987092994...
21 200 0 20.0 [0.0, 0.260196, 0.395066, 0.7063619999999999, ... [0.005039756488365984, 0.006215568753132001, 0...
22 200 0 21.0 [0.0, 0.26385, 0.377695, 0.690218, 0.792184, 1... [0.0012567172712239978, 0.0017176489079889995,...
23 200 0 22.0 [0.0, 0.281995, 0.414246, 0.7113590000000001, ... [-0.004021068645631988, -0.0038137520037749995...
24 200 0 23.0 [0.0, 0.274982, 0.39725, 0.7177819999999999, 0... [0.00020083088635199054, -7.622279512499036e-0...
25 200 0 24.0 [0.0, 0.26048, 0.37926099999999996, 0.71315, 0... [0.015447624570974998, 0.015128326144224, 0.01...
26 200 0 35.0 [0.0, 0.260156, 0.404854, 0.6990419999999999, ... [0.826655732310342, 0.8016246495956479, 0.7899...
27 200 0 25.0 [0.0, 0.161695, 0.420327, 0.576986, 0.853091, ... [-0.01176219648990401, -0.011605116890357987, ...
28 200 0 26.0 [0.0, 0.142957, 0.414581, 0.5652809999999999, ... [-0.00017965899878400054, 0.000329458201304994...
29 200 0 27.0 [0.0, 0.151938, 0.42064199999999996, 0.544481,... [-0.012079087800160012, -0.011521835995015996,...
30 200 0 28.0 [0.0, 0.133352, 0.41161899999999996, 0.53848, ... [-0.005415268565248003, -0.0057859392168489975...
31 200 0 29.0 [0.0, 0.126623, 0.41861800000000005, 0.545479,... [-0.012657557405900003, -0.012799759762259007,...
32 200 0 30.0 [0.0, 0.117088, 0.41099399999999997, 0.55351, ... [-0.013853044528840005, -0.013165832426528, -0...
33 200 0 31.0 [0.0, 0.138599, 0.425755, 0.531386, 0.830847, ... [-0.005198338900159984, -0.006477930675, -0.00...
34 200 0 32.0 [0.0, 0.128048, 0.389703, 0.548266, 0.835658, ... [0.004916808473749997, 0.005265623096212002, 0...
35 200 0 36.0 [0.0, 0.129798, 0.41002700000000003, 0.551741,... [0.8577803214022401, 0.84971873621071, 0.82866...
第一步是减少数据,只保留有信号的事件。当“v”列的其中一行中的值大于 0.05 时,我们认为是一个信号。每个事件有 36 个通道。如果在这 36 个通道中我们至少有一个信号,我们保留与此事件相关的所有通道,因此我们保留此事件。但是,如果在事件的 36 个通道中未检测到信号,我会从数据帧中完全删除该事件。
我已经写了一个函数来完成这部分。为了减少计算时间,我想使用多处理。我首先将数据框分成 10 个部分,因为我想使用 10 个线程。显然,数据框的子部分将始终包含整个事件。事件不能拆分为两个单独的子部分。
def keep_event_data(dataframe, limit) :
events = list(set(dataframe["event_number"]))
list_keep_event = []
for k in events :
df_reduce = dataframe[dataframe["event_number"]==k].reset_index(drop=True)
# WE START THE LOOP IN THE WHOLE DATAFRAME
for j in df_reduce.index :
if any(n > limit for n in df_reduce.loc[j, Col_tension]) :
list_keep_event.append(k)
break
else :
pass
return list_keep_event
def split_dataframe(dataframe, pool) :
max_event = dataframe["event_number"].max()
# Split the dataframe into a list
range_iter = range(max_event+1)
range_split = np.array_split(range_iter, pool)
list_df = []
for j in range_split :
list_df.append(dataframe[dataframe["event_number"].isin(j)])
return list_df
if __name__ == '__main__':
df = some_value
# SPLIT TH DATAFRAME
df_split = split_dataframe(dataframe=df, pool=nb_pool)
func = partial(keep_event_data, limit=limit_tension)
pool = Pool(nb_pool)
res = pool.map(func, df_split)
list_event_keep = list(itertools.chain.from_iterable(res))
df = df[df["event_number"].isin(list_event_keep)]
pool.close()
pool.join()
使用multiprocessing库的map,时间变长了。我观察到它使用了大约 10 Gb RAM 的更多内存,并且 CPU 似乎没有被使用。我该如何解决这个问题。
感谢您的帮助。
您可以尝试的一件事是尽可能“懒惰地”评估以减少存储需求。这意味着用方法 multiprocessing.pool.Pool.imap
代替 multiprocessing.pool.Pool.map
并尽可能使用生成器函数和表达式。
如果你运行在使用spawn
创建新流程的平台上,例如Windows(顺便说一句,你应该用平台标记多平台问题你 是 运行 下),你创建的初始数据帧不应该在全局范围内,而是在 if __name__ == "__main__" :
块内内联或通过函数调用完成。如果代码在全局范围内,它将由多处理池中的每个进程执行,作为其初始化的一部分,需要大量 CPU 和内存资源。
以下代码使用特殊的 class、BoundedQueueProcessPool
,其 imap
方法将阻止 iterable 参数更快地生成其元素比工作函数可以处理的要多。这应该会对内存使用产生积极影响。
import multiprocessing.pool
import multiprocessing
class ImapResult():
def __init__(self, semaphore, result):
self._semaphore = semaphore
self.it = result.__iter__()
def __iter__(self):
return self
def __next__(self):
try:
elem = self.it.__next__()
self._semaphore.release()
return elem
except StopIteration:
raise
except:
self._semaphore.release()
raise
class BoundedQueuePool:
def __init__(self, limit, semaphore):
self._limit = limit
self._semaphore = semaphore
def release(self, result, callback=None):
self._semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._semaphore.acquire()
callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
def imap(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self._semaphore.acquire()
yield elem
if chunksize > self._limit:
raise ValueError(f'chunksize argument exceeds {self._limit}')
result = super().imap(func, new_iterable(iterable), chunksize)
return ImapResult(self._semaphore, result)
def imap_unordered(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self._semaphore.acquire()
yield elem
if chunksize > self._limit:
raise ValueError(f'chunksize argument exceeds {self._limit}')
result = super().imap_unordered(func, new_iterable(iterable), chunksize)
return ImapResult(self._semaphore, result)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
limit = self._processes + max_waiting_tasks
BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))
N_CHUNKS = 1_000
def split(lst, n):
k, m = divmod(len(lst), n)
return (lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def split_dataframe(dataframe):
# Split the dataframe into a list
range_iter = range(len(dataframe.index))
# This will return a generator expression that generates ranges rather
# than arrays for potentially a big savings:
range_split = split(range_iter, N_CHUNKS)
for r in range_split:
yield dataframe[r.start:r.stop]
if __name__ == "__main__" :
dataframe = some_value
func = partial(keep_event_data, limit=limit_tension)
pool = BoundedQueueProcessPool(nb_pool)
# Use chunksize of 1:
res = pool.imap(func, split_dataframe(dataframe))
df = pd.concat(res, ignore_index=True)
pool.close()
pool.join()
但是无法回避这样一个事实,即从一个地址 space 到另一个地址的 serializing/de-serializing 数据帧总是会有相当大的开销。如果您的工作函数 keep_event_data
足够 CPU 密集以证明额外的开销是合理的,则多处理只会减少 运行 时间。
我正在做粒子物理实验的数据分析。我有一个包含数百万行的巨大数据框。以下是数据帧结构的示例:
n_events event_number channel t v
0 200 0 1.0 [0.0, 0.292229, 0.44511900000000004, 0.686493,... [0.007145071463695994, 0.006022061677328001, 0...
1 200 0 2.0 [0.0, 0.28361, 0.43580699999999994, 0.70387699... [-0.004745911500159997, -0.004473575244724004,...
2 200 0 3.0 [0.0, 0.290339, 0.44740100000000005, 0.700787,... [-0.0016976513865119857, -0.000588632718491007...
3 200 0 4.0 [0.0, 0.299564, 0.435033, 0.701605, 0.7830996,... [0.000518971074000005, 0.0015031308265279996, ...
4 200 0 5.0 [0.0, 0.295462, 0.43185, 0.689991, 0.804407, 1... [-0.012224856444671991, -0.012481382646527987,...
... ... ... ... ... ...
7195 200 199 29.0 [0.0, 0.273977, 0.394305, 0.64364, 0.781702, 1... [-0.006007219163999997, -0.006066839886615997,...
7196 200 199 30.0 [0.0, 0.296464, 0.408051, 0.660629, 0.797999, ... [-0.0071352449638400085, -0.007720353590299007...
7197 200 199 31.0 [0.0, 0.271873, 0.39696299999999995, 0.661967,... [0.0007936306725499976, 0.0006720786866000081,...
7198 200 199 32.0 [0.0, 0.274194, 0.390268, 0.652645, 0.794755, ... [0.008792689244320001, 0.008201458288511989, 0...
7199 200 199 36.0 [0.0, 0.281648, 0.402901, 0.656489, 0.793202, ... [0.9035749247410491, 0.899703268236, 0.8972429...
n_events event_number channel t v
0 200 0 1.0 [0.0, 0.292229, 0.44511900000000004, 0.686493,... [0.007145071463695994, 0.006022061677328001, 0...
1 200 0 2.0 [0.0, 0.28361, 0.43580699999999994, 0.70387699... [-0.004745911500159997, -0.004473575244724004,...
2 200 0 3.0 [0.0, 0.290339, 0.44740100000000005, 0.700787,... [-0.0016976513865119857, -0.000588632718491007...
3 200 0 4.0 [0.0, 0.299564, 0.435033, 0.701605, 0.7830996,... [0.000518971074000005, 0.0015031308265279996, ...
4 200 0 5.0 [0.0, 0.295462, 0.43185, 0.689991, 0.804407, 1... [-0.012224856444671991, -0.012481382646527987,...
5 200 0 6.0 [0.0, 0.304114, 0.431968, 0.6919690000000001, ... [-0.006504729964460993, -0.006973892963776013,...
6 200 0 7.0 [0.0, 0.296276, 0.435403, 0.694577, 0.801506, ... [0.0076502175278079804, 0.007291070848924005, ...
7 200 0 8.0 [0.0, 0.302246, 0.43909600000000004, 0.707817,... [0.005709271868173, 0.005525502225727999, 0.00...
8 200 0 33.0 [0.0, 0.312825, 0.450079, 0.7171700000000001, ... [0.6907829934142, 0.65032811641518, 0.63168262...
9 200 0 9.0 [0.0, 0.162366, 0.41691300000000003, 0.546435,... [-0.004370937556799992, -0.004156079620345007,...
10 200 0 10.0 [0.0, 0.168124, 0.430847, 0.551334, 0.810323, ... [0.005713839258880994, 0.005151084024971997, 0...
11 200 0 11.0 [0.0, 0.166088, 0.43300900000000003, 0.5373290... [-0.006496423078124987, -0.0059013241952349995...
12 200 0 12.0 [0.0, 0.133375, 0.4204, 0.521838, 0.803831, 0.... [-0.0029161667740250116, -0.004175291108256005...
13 200 0 13.0 [0.0, 0.126736, 0.425956, 0.530385, 0.811472, ... [-0.0045837650761579906, -0.003585277339199997...
14 200 0 14.0 [0.0, 0.134787, 0.419325, 0.527267, 0.808555, ... [-0.001530516654803007, -0.0011034030005499943...
15 200 0 15.0 [0.0, 0.1509, 0.444928, 0.5294964, 0.8145954, ... [-0.00559784605927201, -0.004645566291229007, ...
16 200 0 16.0 [0.0, 0.134896, 0.422334, 0.538761, 0.81445200... [0.009337214123531992, 0.009593533354463982, 0...
17 200 0 34.0 [0.0, 0.13378, 0.409487, 0.528365, 0.791995, 0... [0.809813636495625, 0.7967833234162501, 0.7681...
18 200 0 17.0 [0.0, 0.240012, 0.402098, 0.6908080000000001, ... [-0.002736104546672001, -0.0034600646495999824...
19 200 0 18.0 [0.0, 0.256932, 0.40772600000000003, 0.694829,... [-0.004761098550782997, -0.004992603885120003,...
20 200 0 19.0 [0.0, 0.260573, 0.411907, 0.7032830000000001, ... [-0.0020984008527860022, -0.001233839987092994...
21 200 0 20.0 [0.0, 0.260196, 0.395066, 0.7063619999999999, ... [0.005039756488365984, 0.006215568753132001, 0...
22 200 0 21.0 [0.0, 0.26385, 0.377695, 0.690218, 0.792184, 1... [0.0012567172712239978, 0.0017176489079889995,...
23 200 0 22.0 [0.0, 0.281995, 0.414246, 0.7113590000000001, ... [-0.004021068645631988, -0.0038137520037749995...
24 200 0 23.0 [0.0, 0.274982, 0.39725, 0.7177819999999999, 0... [0.00020083088635199054, -7.622279512499036e-0...
25 200 0 24.0 [0.0, 0.26048, 0.37926099999999996, 0.71315, 0... [0.015447624570974998, 0.015128326144224, 0.01...
26 200 0 35.0 [0.0, 0.260156, 0.404854, 0.6990419999999999, ... [0.826655732310342, 0.8016246495956479, 0.7899...
27 200 0 25.0 [0.0, 0.161695, 0.420327, 0.576986, 0.853091, ... [-0.01176219648990401, -0.011605116890357987, ...
28 200 0 26.0 [0.0, 0.142957, 0.414581, 0.5652809999999999, ... [-0.00017965899878400054, 0.000329458201304994...
29 200 0 27.0 [0.0, 0.151938, 0.42064199999999996, 0.544481,... [-0.012079087800160012, -0.011521835995015996,...
30 200 0 28.0 [0.0, 0.133352, 0.41161899999999996, 0.53848, ... [-0.005415268565248003, -0.0057859392168489975...
31 200 0 29.0 [0.0, 0.126623, 0.41861800000000005, 0.545479,... [-0.012657557405900003, -0.012799759762259007,...
32 200 0 30.0 [0.0, 0.117088, 0.41099399999999997, 0.55351, ... [-0.013853044528840005, -0.013165832426528, -0...
33 200 0 31.0 [0.0, 0.138599, 0.425755, 0.531386, 0.830847, ... [-0.005198338900159984, -0.006477930675, -0.00...
34 200 0 32.0 [0.0, 0.128048, 0.389703, 0.548266, 0.835658, ... [0.004916808473749997, 0.005265623096212002, 0...
35 200 0 36.0 [0.0, 0.129798, 0.41002700000000003, 0.551741,... [0.8577803214022401, 0.84971873621071, 0.82866...
第一步是减少数据,只保留有信号的事件。当“v”列的其中一行中的值大于 0.05 时,我们认为是一个信号。每个事件有 36 个通道。如果在这 36 个通道中我们至少有一个信号,我们保留与此事件相关的所有通道,因此我们保留此事件。但是,如果在事件的 36 个通道中未检测到信号,我会从数据帧中完全删除该事件。
我已经写了一个函数来完成这部分。为了减少计算时间,我想使用多处理。我首先将数据框分成 10 个部分,因为我想使用 10 个线程。显然,数据框的子部分将始终包含整个事件。事件不能拆分为两个单独的子部分。
def keep_event_data(dataframe, limit) :
events = list(set(dataframe["event_number"]))
list_keep_event = []
for k in events :
df_reduce = dataframe[dataframe["event_number"]==k].reset_index(drop=True)
# WE START THE LOOP IN THE WHOLE DATAFRAME
for j in df_reduce.index :
if any(n > limit for n in df_reduce.loc[j, Col_tension]) :
list_keep_event.append(k)
break
else :
pass
return list_keep_event
def split_dataframe(dataframe, pool) :
max_event = dataframe["event_number"].max()
# Split the dataframe into a list
range_iter = range(max_event+1)
range_split = np.array_split(range_iter, pool)
list_df = []
for j in range_split :
list_df.append(dataframe[dataframe["event_number"].isin(j)])
return list_df
if __name__ == '__main__':
df = some_value
# SPLIT TH DATAFRAME
df_split = split_dataframe(dataframe=df, pool=nb_pool)
func = partial(keep_event_data, limit=limit_tension)
pool = Pool(nb_pool)
res = pool.map(func, df_split)
list_event_keep = list(itertools.chain.from_iterable(res))
df = df[df["event_number"].isin(list_event_keep)]
pool.close()
pool.join()
使用multiprocessing库的map,时间变长了。我观察到它使用了大约 10 Gb RAM 的更多内存,并且 CPU 似乎没有被使用。我该如何解决这个问题。
感谢您的帮助。
您可以尝试的一件事是尽可能“懒惰地”评估以减少存储需求。这意味着用方法 multiprocessing.pool.Pool.imap
代替 multiprocessing.pool.Pool.map
并尽可能使用生成器函数和表达式。
如果你运行在使用spawn
创建新流程的平台上,例如Windows(顺便说一句,你应该用平台标记多平台问题你 是 运行 下),你创建的初始数据帧不应该在全局范围内,而是在 if __name__ == "__main__" :
块内内联或通过函数调用完成。如果代码在全局范围内,它将由多处理池中的每个进程执行,作为其初始化的一部分,需要大量 CPU 和内存资源。
以下代码使用特殊的 class、BoundedQueueProcessPool
,其 imap
方法将阻止 iterable 参数更快地生成其元素比工作函数可以处理的要多。这应该会对内存使用产生积极影响。
import multiprocessing.pool
import multiprocessing
class ImapResult():
def __init__(self, semaphore, result):
self._semaphore = semaphore
self.it = result.__iter__()
def __iter__(self):
return self
def __next__(self):
try:
elem = self.it.__next__()
self._semaphore.release()
return elem
except StopIteration:
raise
except:
self._semaphore.release()
raise
class BoundedQueuePool:
def __init__(self, limit, semaphore):
self._limit = limit
self._semaphore = semaphore
def release(self, result, callback=None):
self._semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._semaphore.acquire()
callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
def imap(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self._semaphore.acquire()
yield elem
if chunksize > self._limit:
raise ValueError(f'chunksize argument exceeds {self._limit}')
result = super().imap(func, new_iterable(iterable), chunksize)
return ImapResult(self._semaphore, result)
def imap_unordered(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self._semaphore.acquire()
yield elem
if chunksize > self._limit:
raise ValueError(f'chunksize argument exceeds {self._limit}')
result = super().imap_unordered(func, new_iterable(iterable), chunksize)
return ImapResult(self._semaphore, result)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
limit = self._processes + max_waiting_tasks
BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))
N_CHUNKS = 1_000
def split(lst, n):
k, m = divmod(len(lst), n)
return (lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def split_dataframe(dataframe):
# Split the dataframe into a list
range_iter = range(len(dataframe.index))
# This will return a generator expression that generates ranges rather
# than arrays for potentially a big savings:
range_split = split(range_iter, N_CHUNKS)
for r in range_split:
yield dataframe[r.start:r.stop]
if __name__ == "__main__" :
dataframe = some_value
func = partial(keep_event_data, limit=limit_tension)
pool = BoundedQueueProcessPool(nb_pool)
# Use chunksize of 1:
res = pool.imap(func, split_dataframe(dataframe))
df = pd.concat(res, ignore_index=True)
pool.close()
pool.join()
但是无法回避这样一个事实,即从一个地址 space 到另一个地址的 serializing/de-serializing 数据帧总是会有相当大的开销。如果您的工作函数 keep_event_data
足够 CPU 密集以证明额外的开销是合理的,则多处理只会减少 运行 时间。