Python : 具有巨大数据帧的多处理非常慢

Python : Multiprocessing with a huge dataframe is pretty slow

我正在做粒子物理实验的数据分析。我有一个包含数百万行的巨大数据框。以下是数据帧结构的示例:

      n_events  event_number  channel                                                  t                                                  v
0          200             0      1.0  [0.0, 0.292229, 0.44511900000000004, 0.686493,...  [0.007145071463695994, 0.006022061677328001, 0...
1          200             0      2.0  [0.0, 0.28361, 0.43580699999999994, 0.70387699...  [-0.004745911500159997, -0.004473575244724004,...
2          200             0      3.0  [0.0, 0.290339, 0.44740100000000005, 0.700787,...  [-0.0016976513865119857, -0.000588632718491007...
3          200             0      4.0  [0.0, 0.299564, 0.435033, 0.701605, 0.7830996,...  [0.000518971074000005, 0.0015031308265279996, ...
4          200             0      5.0  [0.0, 0.295462, 0.43185, 0.689991, 0.804407, 1...  [-0.012224856444671991, -0.012481382646527987,...
...        ...           ...      ...                                                ...                                                ...
7195       200           199     29.0  [0.0, 0.273977, 0.394305, 0.64364, 0.781702, 1...  [-0.006007219163999997, -0.006066839886615997,...
7196       200           199     30.0  [0.0, 0.296464, 0.408051, 0.660629, 0.797999, ...  [-0.0071352449638400085, -0.007720353590299007...
7197       200           199     31.0  [0.0, 0.271873, 0.39696299999999995, 0.661967,...  [0.0007936306725499976, 0.0006720786866000081,...
7198       200           199     32.0  [0.0, 0.274194, 0.390268, 0.652645, 0.794755, ...  [0.008792689244320001, 0.008201458288511989, 0...
7199       200           199     36.0  [0.0, 0.281648, 0.402901, 0.656489, 0.793202, ...  [0.9035749247410491, 0.899703268236, 0.8972429...


    n_events  event_number  channel                                                  t                                                  v
0        200             0      1.0  [0.0, 0.292229, 0.44511900000000004, 0.686493,...  [0.007145071463695994, 0.006022061677328001, 0...
1        200             0      2.0  [0.0, 0.28361, 0.43580699999999994, 0.70387699...  [-0.004745911500159997, -0.004473575244724004,...
2        200             0      3.0  [0.0, 0.290339, 0.44740100000000005, 0.700787,...  [-0.0016976513865119857, -0.000588632718491007...
3        200             0      4.0  [0.0, 0.299564, 0.435033, 0.701605, 0.7830996,...  [0.000518971074000005, 0.0015031308265279996, ...
4        200             0      5.0  [0.0, 0.295462, 0.43185, 0.689991, 0.804407, 1...  [-0.012224856444671991, -0.012481382646527987,...
5        200             0      6.0  [0.0, 0.304114, 0.431968, 0.6919690000000001, ...  [-0.006504729964460993, -0.006973892963776013,...
6        200             0      7.0  [0.0, 0.296276, 0.435403, 0.694577, 0.801506, ...  [0.0076502175278079804, 0.007291070848924005, ...
7        200             0      8.0  [0.0, 0.302246, 0.43909600000000004, 0.707817,...  [0.005709271868173, 0.005525502225727999, 0.00...
8        200             0     33.0  [0.0, 0.312825, 0.450079, 0.7171700000000001, ...  [0.6907829934142, 0.65032811641518, 0.63168262...
9        200             0      9.0  [0.0, 0.162366, 0.41691300000000003, 0.546435,...  [-0.004370937556799992, -0.004156079620345007,...
10       200             0     10.0  [0.0, 0.168124, 0.430847, 0.551334, 0.810323, ...  [0.005713839258880994, 0.005151084024971997, 0...
11       200             0     11.0  [0.0, 0.166088, 0.43300900000000003, 0.5373290...  [-0.006496423078124987, -0.0059013241952349995...
12       200             0     12.0  [0.0, 0.133375, 0.4204, 0.521838, 0.803831, 0....  [-0.0029161667740250116, -0.004175291108256005...
13       200             0     13.0  [0.0, 0.126736, 0.425956, 0.530385, 0.811472, ...  [-0.0045837650761579906, -0.003585277339199997...
14       200             0     14.0  [0.0, 0.134787, 0.419325, 0.527267, 0.808555, ...  [-0.001530516654803007, -0.0011034030005499943...
15       200             0     15.0  [0.0, 0.1509, 0.444928, 0.5294964, 0.8145954, ...  [-0.00559784605927201, -0.004645566291229007, ...
16       200             0     16.0  [0.0, 0.134896, 0.422334, 0.538761, 0.81445200...  [0.009337214123531992, 0.009593533354463982, 0...
17       200             0     34.0  [0.0, 0.13378, 0.409487, 0.528365, 0.791995, 0...  [0.809813636495625, 0.7967833234162501, 0.7681...
18       200             0     17.0  [0.0, 0.240012, 0.402098, 0.6908080000000001, ...  [-0.002736104546672001, -0.0034600646495999824...
19       200             0     18.0  [0.0, 0.256932, 0.40772600000000003, 0.694829,...  [-0.004761098550782997, -0.004992603885120003,...
20       200             0     19.0  [0.0, 0.260573, 0.411907, 0.7032830000000001, ...  [-0.0020984008527860022, -0.001233839987092994...
21       200             0     20.0  [0.0, 0.260196, 0.395066, 0.7063619999999999, ...  [0.005039756488365984, 0.006215568753132001, 0...
22       200             0     21.0  [0.0, 0.26385, 0.377695, 0.690218, 0.792184, 1...  [0.0012567172712239978, 0.0017176489079889995,...
23       200             0     22.0  [0.0, 0.281995, 0.414246, 0.7113590000000001, ...  [-0.004021068645631988, -0.0038137520037749995...
24       200             0     23.0  [0.0, 0.274982, 0.39725, 0.7177819999999999, 0...  [0.00020083088635199054, -7.622279512499036e-0...
25       200             0     24.0  [0.0, 0.26048, 0.37926099999999996, 0.71315, 0...  [0.015447624570974998, 0.015128326144224, 0.01...
26       200             0     35.0  [0.0, 0.260156, 0.404854, 0.6990419999999999, ...  [0.826655732310342, 0.8016246495956479, 0.7899...
27       200             0     25.0  [0.0, 0.161695, 0.420327, 0.576986, 0.853091, ...  [-0.01176219648990401, -0.011605116890357987, ...
28       200             0     26.0  [0.0, 0.142957, 0.414581, 0.5652809999999999, ...  [-0.00017965899878400054, 0.000329458201304994...
29       200             0     27.0  [0.0, 0.151938, 0.42064199999999996, 0.544481,...  [-0.012079087800160012, -0.011521835995015996,...
30       200             0     28.0  [0.0, 0.133352, 0.41161899999999996, 0.53848, ...  [-0.005415268565248003, -0.0057859392168489975...
31       200             0     29.0  [0.0, 0.126623, 0.41861800000000005, 0.545479,...  [-0.012657557405900003, -0.012799759762259007,...
32       200             0     30.0  [0.0, 0.117088, 0.41099399999999997, 0.55351, ...  [-0.013853044528840005, -0.013165832426528, -0...
33       200             0     31.0  [0.0, 0.138599, 0.425755, 0.531386, 0.830847, ...  [-0.005198338900159984, -0.006477930675, -0.00...
34       200             0     32.0  [0.0, 0.128048, 0.389703, 0.548266, 0.835658, ...  [0.004916808473749997, 0.005265623096212002, 0...
35       200             0     36.0  [0.0, 0.129798, 0.41002700000000003, 0.551741,...  [0.8577803214022401, 0.84971873621071, 0.82866...

第一步是减少数据,只保留有信号的事件。当“v”列的其中一行中的值大于 0.05 时,我们认为是一个信号。每个事件有 36 个通道。如果在这 36 个通道中我们至少有一个信号,我们保留与此事件相关的所有通道,因此我们保留此事件。但是,如果在事件的 36 个通道中未检测到信号,我会从数据帧中完全删除该事件。

我已经写了一个函数来完成这部分。为了减少计算时间,我想使用多处理。我首先将数据框分成 10 个部分,因为我想使用 10 个线程。显然,数据框的子部分将始终包含整个事件。事件不能拆分为两个单独的子部分。

def keep_event_data(dataframe, limit) :

    events = list(set(dataframe["event_number"]))
    list_keep_event = []

    for k in events :

        df_reduce = dataframe[dataframe["event_number"]==k].reset_index(drop=True)

        # WE START THE LOOP IN THE WHOLE DATAFRAME
        for j in df_reduce.index :

            if any(n > limit for n in df_reduce.loc[j, Col_tension]) :
    
                list_keep_event.append(k)
                break
            
            else :

                pass

    return list_keep_event


def split_dataframe(dataframe, pool) :

    max_event = dataframe["event_number"].max()

    # Split the dataframe into a list
    range_iter = range(max_event+1)
    range_split = np.array_split(range_iter, pool)

    list_df = []

    for j in range_split :

        list_df.append(dataframe[dataframe["event_number"].isin(j)])

    return list_df


if __name__ == '__main__':

    df = some_value

    # SPLIT TH DATAFRAME 
    df_split = split_dataframe(dataframe=df, pool=nb_pool)


    func = partial(keep_event_data, limit=limit_tension)

    pool = Pool(nb_pool)
    res = pool.map(func, df_split)
    list_event_keep = list(itertools.chain.from_iterable(res))
    df = df[df["event_number"].isin(list_event_keep)]
    pool.close()
    pool.join()

使用multiprocessing库的map,时间变长了。我观察到它使用了大约 10 Gb RAM 的更多内存,并且 CPU 似乎没有被使用。我该如何解决这个问题。

感谢您的帮助。

您可以尝试的一件事是尽可能“懒惰地”评估以减少存储需求。这意味着用方法 multiprocessing.pool.Pool.imap 代替 multiprocessing.pool.Pool.map 并尽可能使用生成器函数和表达式。

如果你运行在使用spawn创建新流程的平台上,例如Windows(顺便说一句,你应该用平台标记多平台问题你 运行 下),你创建的初始数据帧不应该在全局范围内,而是在 if __name__ == "__main__" : 块内内联或通过函数调用完成。如果代码在全局范围内,它将由多处理池中的每个进程执行,作为其初始化的一部分,需要大量 CPU 和内存资源。

以下代码使用特殊的 class、BoundedQueueProcessPool,其 imap 方法将阻止 iterable 参数更快地生成其元素比工作函数可以处理的要多。这应该会对内存使用产生积极影响。

import multiprocessing.pool
import multiprocessing

class ImapResult():
    def __init__(self, semaphore, result):
        self._semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self._semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self._semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, limit, semaphore):
        self._limit = limit
        self._semaphore = semaphore

    def release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
        error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))

N_CHUNKS = 1_000

def split(lst, n):
    k, m = divmod(len(lst), n)
    return (lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def split_dataframe(dataframe):
    # Split the dataframe into a list
    range_iter = range(len(dataframe.index))
    # This will return a generator expression that generates ranges rather
    # than arrays for potentially a big savings:
    range_split = split(range_iter, N_CHUNKS)
    for r in range_split:
        yield dataframe[r.start:r.stop]


if __name__ == "__main__" :

    dataframe = some_value

    func = partial(keep_event_data, limit=limit_tension)

    pool = BoundedQueueProcessPool(nb_pool)
    # Use chunksize of 1:
    res = pool.imap(func, split_dataframe(dataframe))
    df = pd.concat(res, ignore_index=True)
    pool.close()
    pool.join()

但是无法回避这样一个事实,即从一个地址 space 到另一个地址的 serializing/de-serializing 数据帧总是会有相当大的开销。如果您的工作函数 keep_event_data 足够 CPU 密集以证明额外的开销是合理的,则多处理只会减少 运行 时间。