优化 pandas 数据帧应用于解码 CAN 帧 - 遍历行
Optimizing pandas dataframe apply to decode CAN frames - iterating over rows
我正在尝试减少在数据帧(最多 200 万行)中的每一行上应用来自 cantools 库的复杂函数所花费的时间:
Timestamp Type ID Data
0 16T122109957 0 522 b'0006'
1 16T122109960 0 281 b'0000ce52d2290000'
2 16T122109960 0 279 b'0000000000000000'
3 16T122109960 0 304 b'0000'
4 16T122109961 0 277 b'400000'
使用上面的数据框和读入的dbc文件。dbc文件是一组关于如何encode/decode数据的规则。
使用 DataFrame 应用最多可能需要 10 分钟:
df['decoded'] = df.apply(lambda x: dbc.decode_message(df['ID'][x], df['Data']))
将两列放入列表中,然后遍历列表只需要大约一分钟即可完成,但是当将新数组保存到数据帧时,出现错误 ValueError: array is too big
。这是预期的,因为它很大。
示例循环代码:
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts)
我尝试了 python 矢量化,这显然是最快的,但遇到了我似乎无法修复的错误 TypeError: 'Series' objects are mutable, thus they cannot be hashed
。
示例:
Data['dict'] = dbc.decode_message(df['ID'], df['Data'])
是否有任何其他方法可以加快应用过程,或者我应该尝试进行矢量化?
最小示例:
import cantools
import pandas as pd
df = pd.read_csv('file.log', skiprows=11, sep=';')
dbc = cantools.database.load_file('file.dbc')
# option 1 SLOW
df['decoded'] = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']))
# option 2 Faster...
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts) #< -- causes error for being to big
#option 3
df['dict'] = dbc.decode_message(df['ID'], df['Data']) #< --Error
将此作为答案发布,但 YMMV:
只要 cantools
库不支持处理 Series
或 DataFrame
对象,向量化就不会起作用。所以使用 apply
是唯一的方法。
由于 dbc 转换是逐行进行的,没有任何行间依赖性,因此您应该能够将其并行化。
你需要
编写一个函数进行数据帧的转换:
def decode(df):
df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
return df
这样称呼它:
import pandas as pd
import numpy as np
import multiprocessing as mp
def parallelApply(df, func, numChunks=4):
df_split = np.array_split(df, numChunks)
pool = mp.Pool(numChunks)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
df = parallelApply(df, decode)
parallelApply
所做的是将数据帧拆分为 numChunks
块,并创建一个包含那么多条目的多处理池。
然后在单独的进程中将函数 func
(在您的情况下为 decode
)应用于每个块。
decode
returns 它更新的数据帧块 pd.concat
将再次合并它们。
还有一个名为 pandarallel 的非常方便的库可以为您执行此操作,但是当 运行 on Windows 时您将被迫使用 WSL。:
pip install pandarallel
调用后
from pandarallel import pandarallel
pandarallel.initialize()
你只需转换来自
的调用
df.apply(...)
到
df.parallel_apply(func)
该库将启动多个进程并让每个进程处理一个数据子集。
改编自 M. Spiller 的回答 - 差异显示在括号中:
(imports) 这些必须导入:
从 multiprocessing.dummy 导入 freeze_support
import cantools
import pandas as pd
from itertools import repeat
import multiprocessing as mp
编写一个函数进行数据帧的转换(并传递 dbc 进行解码):
def decode(df, dbc):
df2 = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
df2 = pd.DataFrame
return df2
这样调用(通过函数传递 dbc):
def parallel_apply(df, func, dbc=None, numChunks=mp.cpu_count()):
df_split = np.array_split(df, numChunks)
pool = mp.Pool(numChunks)
df2 = pd.concat(pool.starmap(func, zip(df_split, repeat(dbc))))
pool.close()
pool.join()
return df2
Freeze_support()
#read in dbc
#read in df with encoded CAN messages
df2 = parallel_apply(df, decode, dbc)
实现已放置评论的阅读功能。此解决方案将使用 CPU 上的所有核心并将任务分成 4 个块 - 并行处理并在最后重新加入数据帧。
我正在尝试减少在数据帧(最多 200 万行)中的每一行上应用来自 cantools 库的复杂函数所花费的时间:
Timestamp Type ID Data
0 16T122109957 0 522 b'0006'
1 16T122109960 0 281 b'0000ce52d2290000'
2 16T122109960 0 279 b'0000000000000000'
3 16T122109960 0 304 b'0000'
4 16T122109961 0 277 b'400000'
使用上面的数据框和读入的dbc文件。dbc文件是一组关于如何encode/decode数据的规则。
使用 DataFrame 应用最多可能需要 10 分钟:
df['decoded'] = df.apply(lambda x: dbc.decode_message(df['ID'][x], df['Data']))
将两列放入列表中,然后遍历列表只需要大约一分钟即可完成,但是当将新数组保存到数据帧时,出现错误 ValueError: array is too big
。这是预期的,因为它很大。
示例循环代码:
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts)
我尝试了 python 矢量化,这显然是最快的,但遇到了我似乎无法修复的错误 TypeError: 'Series' objects are mutable, thus they cannot be hashed
。
示例:
Data['dict'] = dbc.decode_message(df['ID'], df['Data'])
是否有任何其他方法可以加快应用过程,或者我应该尝试进行矢量化?
最小示例:
import cantools
import pandas as pd
df = pd.read_csv('file.log', skiprows=11, sep=';')
dbc = cantools.database.load_file('file.dbc')
# option 1 SLOW
df['decoded'] = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']))
# option 2 Faster...
id_list = df['ID'].tolist()
datalist = df['Data'].tolist()
for i in range(len(id_list)):
listOfDicts.append(dbc.decode_message(id_list[i], datalist[i]))
Data = DataFrame(listOfDicts) #< -- causes error for being to big
#option 3
df['dict'] = dbc.decode_message(df['ID'], df['Data']) #< --Error
将此作为答案发布,但 YMMV:
只要 cantools
库不支持处理 Series
或 DataFrame
对象,向量化就不会起作用。所以使用 apply
是唯一的方法。
由于 dbc 转换是逐行进行的,没有任何行间依赖性,因此您应该能够将其并行化。
你需要
编写一个函数进行数据帧的转换:
def decode(df): df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1) return df
这样称呼它:
import pandas as pd import numpy as np import multiprocessing as mp def parallelApply(df, func, numChunks=4): df_split = np.array_split(df, numChunks) pool = mp.Pool(numChunks) df = pd.concat(pool.map(func, df_split)) pool.close() pool.join() return df df = parallelApply(df, decode)
parallelApply
所做的是将数据帧拆分为 numChunks
块,并创建一个包含那么多条目的多处理池。
然后在单独的进程中将函数 func
(在您的情况下为 decode
)应用于每个块。
decode
returns 它更新的数据帧块 pd.concat
将再次合并它们。
还有一个名为 pandarallel 的非常方便的库可以为您执行此操作,但是当 运行 on Windows 时您将被迫使用 WSL。:
pip install pandarallel
调用后
from pandarallel import pandarallel
pandarallel.initialize()
你只需转换来自
的调用df.apply(...)
到
df.parallel_apply(func)
该库将启动多个进程并让每个进程处理一个数据子集。
改编自 M. Spiller 的回答 - 差异显示在括号中:
(imports) 这些必须导入: 从 multiprocessing.dummy 导入 freeze_support
import cantools
import pandas as pd
from itertools import repeat
import multiprocessing as mp
编写一个函数进行数据帧的转换(并传递 dbc 进行解码):
def decode(df, dbc):
df2 = df.apply(lambda x: dbc.decode_message(x['ID'], x['Data']), axis=1)
df2 = pd.DataFrame
return df2
这样调用(通过函数传递 dbc):
def parallel_apply(df, func, dbc=None, numChunks=mp.cpu_count()):
df_split = np.array_split(df, numChunks)
pool = mp.Pool(numChunks)
df2 = pd.concat(pool.starmap(func, zip(df_split, repeat(dbc))))
pool.close()
pool.join()
return df2
Freeze_support()
#read in dbc
#read in df with encoded CAN messages
df2 = parallel_apply(df, decode, dbc)
实现已放置评论的阅读功能。此解决方案将使用 CPU 上的所有核心并将任务分成 4 个块 - 并行处理并在最后重新加入数据帧。