优化 pandas 数据帧应用于解码 CAN 帧 - 遍历行

Optimizing pandas dataframe apply to decode CAN frames - iterating over rows

我正在尝试减少在数据帧(最多 200 万行)中的每一行上应用来自 cantools 库的复杂函数所花费的时间: Timestamp Type ID Data 0 16T122109957 0 522 b'0006' 1 16T122109960 0 281 b'0000ce52d2290000' 2 16T122109960 0 279 b'0000000000000000' 3 16T122109960 0 304 b'0000' 4 16T122109961 0 277 b'400000'

使用上面的数据框和读入的dbc文件。dbc文件是一组关于如何encode/decode数据的规则。

使用 DataFrame 应用最多可能需要 10 分钟:

df['decoded'] = df.apply(lambda x: dbc.decode_message(df['ID'][x], df['Data']))

将两列放入列表中,然后遍历列表只需要大约一分钟即可完成,但是当将新数组保存到数据帧时,出现错误 ValueError: array is too big。这是预期的,因为它很大。

示例循环代码:

id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
    listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts)

我尝试了 python 矢量化,这显然是最快的,但遇到了我似乎无法修复的错误 TypeError: 'Series' objects are mutable, thus they cannot be hashed。 示例:

Data['dict'] = dbc.decode_message(df['ID'], df['Data'])

是否有任何其他方法可以加快应用过程,或者我应该尝试进行矢量化?

最小示例:

import cantools
import pandas as pd

df = pd.read_csv('file.log', skiprows=11, sep=';')
dbc = cantools.database.load_file('file.dbc')

# option 1 SLOW
df['decoded'] = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']))

# option 2 Faster...
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
    listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts) #< -- causes error for being to big

#option 3
df['dict'] = dbc.decode_message(df['ID'], df['Data']) #< --Error

将此作为答案发布,但 YMMV:

只要 cantools 库不支持处理 SeriesDataFrame 对象,向量化就不会起作用。所以使用 apply 是唯一的方法。

由于 dbc 转换是逐行进行的,没有任何行间依赖性,因此您应该能够将其并行化。

你需要

  • 编写一个函数进行数据帧的转换:

    def decode(df):
        df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
        return df
    
  • 这样称呼它:

    import pandas as pd
    import numpy as np
    import multiprocessing as mp
    
    def parallelApply(df, func, numChunks=4):
        df_split = np.array_split(df, numChunks)
        pool = mp.Pool(numChunks)
        df = pd.concat(pool.map(func, df_split))
        pool.close()
        pool.join()
        return df
    
    df = parallelApply(df, decode)
    

parallelApply 所做的是将数据帧拆分为 numChunks 块,并创建一个包含那么多条目的多处理池。

然后在单独的进程中将函数 func(在您的情况下为 decode)应用于每个块。

decode returns 它更新的数据帧块 pd.concat 将再次合并它们。


还有一个名为 pandarallel 的非常方便的库可以为您执行此操作,但是当 运行 on Windows 时您将被迫使用 WSL。:

pip install pandarallel

调用后

from pandarallel import pandarallel
pandarallel.initialize()

你只需转换来自

的调用
df.apply(...)

df.parallel_apply(func)

该库将启动多个进程并让每个进程处理一个数据子集。

改编自 M. Spiller 的回答 - 差异显示在括号中:

(imports) 这些必须导入: 从 multiprocessing.dummy 导入 freeze_support

import cantools
import pandas as pd
from itertools import repeat
import multiprocessing as mp

编写一个函数进行数据帧的转换(并传递 dbc 进行解码):

def decode(df, dbc):
    df2 = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
    df2 = pd.DataFrame
    return df2

这样调用(通过函数传递 dbc):

def parallel_apply(df, func, dbc=None, numChunks=mp.cpu_count()):
    df_split = np.array_split(df, numChunks)
    pool = mp.Pool(numChunks)

    df2 = pd.concat(pool.starmap(func, zip(df_split, repeat(dbc))))
    pool.close()
    pool.join()
    return df2

Freeze_support()
#read in dbc
#read in df with encoded CAN messages
df2 = parallel_apply(df, decode, dbc)

实现已放置评论的阅读功能。此解决方案将使用 CPU 上的所有核心并将任务分成 4 个块 - 并行处理并在最后重新加入数据帧。