Pandas 数据帧到 Object 批量数据库插入的实例数组效率
Pandas dataframe to Object instances array efficiency for bulk DB insert
我有一个 Pandas 数据框,其形式为:
Time Temperature Voltage Current
0.0 7.8 14 56
0.1 7.9 12 58
0.2 7.6 15 55
... So on for a few hundred thousand rows...
我需要尽快将数据批量插入到 PostgreSQL 数据库中。这是一个 Django 项目,我目前正在使用 ORM 进行数据库操作和构建查询,但如果有更有效的方法来完成任务,我愿意接受建议。
我的数据模型如下所示:
class Data(models.Model):
time = models.DateTimeField(db_index=True)
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
parameter_value = models.FloatField()
所以 Time
是 DataFrame 的 row[0]
,然后对于每个 header 列,我使用 header 作为 parameter
。所以示例 table 的 row[0]
会在我的数据库中生成 3 Data
objects:
Data(time=0.0, parameter="Temperature", parameter_value=7.8)
Data(time=0.0, parameter="Voltage", parameter_value=14)
Data(time=0.0, parameter="Current", parameter_value=56)
我们的应用程序允许用户解析以毫秒为单位的数据文件。所以我们从一个文件中生成了很多单独的数据 objects。我当前的任务是改进解析器,使其更加高效,直到我们达到 I/O 硬件级别的限制。
我目前的解决方案是遍历每一行,为 time + parameter + value
上的每一行创建一个 Data
object 并将所述 object 附加到一个数组中,这样我就可以Data.objects.bulk_create(all_data_objects)
通过 Django。当然我知道这是低效的,可能会有很大的改进。
使用此代码:
# Convert DataFrame to dict
df_records = df.to_dict('records')
# Start empty dta array
all_data_objects = []
# Go through each row creating objects and appending to data array
for row in df_records:
for parameter, parameter_value in row.items():
if parameter != "Time":
all_data_objects.append(Data(
time=row["Time"],
parameter_value=parameter_value,
parameter=parameter))
# Commit data to Postgres DB
Data.objects.bulk_create(all_data)
目前整个操作,不包括数据库插入操作(写入磁盘),即只生成Data
objects数组,对于生成大约 600 万个 Data
object 的 55mb 文件,大约需要 370 秒。仅 df_records = df.to_dict('records')
行需要 83 秒。在每个部分的两端使用 time.time()
测量时间并计算差异。
我怎样才能改善这些时间?
您不需要为所有行创建数据对象。 SqlAlchemy 也支持这种方式的批量插入:
data.insert().values([
dict(time=0.0, parameter="Temperature", parameter_value=7.8),
dict(time=0.0, parameter="Voltage", parameter_value=14)
])
如果您只需要插入数据,则不需要 pandas 并且可以为您的数据文件使用不同的解析器(或者编写您自己的解析器,具体取决于您的数据文件的格式)。此外,将数据集拆分成更小的部分并并行化插入命令可能是有意义的。
如果你真的需要一个快速的解决方案,我建议你直接使用pandas
.
来愚弄table
首先让我们为您的示例创建数据:
import pandas as pd
data = {
'Time': {0: 0.0, 1: 0.1, 2: 0.2},
'Temperature': {0: 7.8, 1: 7.9, 2: 7.6},
'Voltage': {0: 14, 1: 12, 2: 15},
'Current': {0: 56, 1: 58, 2: 55}
}
df = pd.DataFrame(data)
现在您应该转换数据框,以便您拥有所需的列 melt
:
df = df.melt(["Time"], var_name="parameter", value_name="parameter_value")
此时您应该将 parameter
值映射到外部 id
。我将以 params
为例:
params = {"Temperature": 1, "Voltage": 2, "Current": 3}
df["parameter"] = df["parameter"].map(params)
此时数据框将如下所示:
Time parameter parameter_value
0 0.0 1 7.8
1 0.1 1 7.9
2 0.2 1 7.6
3 0.0 2 14.0
4 0.1 2 12.0
5 0.2 2 15.0
6 0.0 3 56.0
7 0.1 3 58.0
8 0.2 3 55.0
现在要使用 pandas 导出,您可以使用:
import sqlalchemy as sa
engine = sa.create_engine("use your connection data")
df.to_sql(name="my_table", con=engine, if_exists="append", index=False)
但是我用的时候速度不够快,不能满足我们的要求。所以我建议你使用 cursor.copy_from
insted 因为它更快:
from io import StringIO
output = StringIO()
df.to_csv(output, sep=';', header=False, index=False, columns=df.columns)
output.getvalue()
# jump to start of stream
output.seek(0)
# Insert df into postgre
connection = engine.raw_connection()
with connection.cursor() as cursor:
cursor.copy_from(output, "my_table", sep=';', null="NULL", columns=(df.columns))
connection.commit()
我们尝试了数百万次,这是使用 PostgreSQL.
时最快的方法
我有一个 Pandas 数据框,其形式为:
Time Temperature Voltage Current
0.0 7.8 14 56
0.1 7.9 12 58
0.2 7.6 15 55
... So on for a few hundred thousand rows...
我需要尽快将数据批量插入到 PostgreSQL 数据库中。这是一个 Django 项目,我目前正在使用 ORM 进行数据库操作和构建查询,但如果有更有效的方法来完成任务,我愿意接受建议。
我的数据模型如下所示:
class Data(models.Model):
time = models.DateTimeField(db_index=True)
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
parameter_value = models.FloatField()
所以 Time
是 DataFrame 的 row[0]
,然后对于每个 header 列,我使用 header 作为 parameter
。所以示例 table 的 row[0]
会在我的数据库中生成 3 Data
objects:
Data(time=0.0, parameter="Temperature", parameter_value=7.8)
Data(time=0.0, parameter="Voltage", parameter_value=14)
Data(time=0.0, parameter="Current", parameter_value=56)
我们的应用程序允许用户解析以毫秒为单位的数据文件。所以我们从一个文件中生成了很多单独的数据 objects。我当前的任务是改进解析器,使其更加高效,直到我们达到 I/O 硬件级别的限制。
我目前的解决方案是遍历每一行,为 time + parameter + value
上的每一行创建一个 Data
object 并将所述 object 附加到一个数组中,这样我就可以Data.objects.bulk_create(all_data_objects)
通过 Django。当然我知道这是低效的,可能会有很大的改进。
使用此代码:
# Convert DataFrame to dict
df_records = df.to_dict('records')
# Start empty dta array
all_data_objects = []
# Go through each row creating objects and appending to data array
for row in df_records:
for parameter, parameter_value in row.items():
if parameter != "Time":
all_data_objects.append(Data(
time=row["Time"],
parameter_value=parameter_value,
parameter=parameter))
# Commit data to Postgres DB
Data.objects.bulk_create(all_data)
目前整个操作,不包括数据库插入操作(写入磁盘),即只生成Data
objects数组,对于生成大约 600 万个 Data
object 的 55mb 文件,大约需要 370 秒。仅 df_records = df.to_dict('records')
行需要 83 秒。在每个部分的两端使用 time.time()
测量时间并计算差异。
我怎样才能改善这些时间?
您不需要为所有行创建数据对象。 SqlAlchemy 也支持这种方式的批量插入:
data.insert().values([
dict(time=0.0, parameter="Temperature", parameter_value=7.8),
dict(time=0.0, parameter="Voltage", parameter_value=14)
])
如果您只需要插入数据,则不需要 pandas 并且可以为您的数据文件使用不同的解析器(或者编写您自己的解析器,具体取决于您的数据文件的格式)。此外,将数据集拆分成更小的部分并并行化插入命令可能是有意义的。
如果你真的需要一个快速的解决方案,我建议你直接使用pandas
.
首先让我们为您的示例创建数据:
import pandas as pd
data = {
'Time': {0: 0.0, 1: 0.1, 2: 0.2},
'Temperature': {0: 7.8, 1: 7.9, 2: 7.6},
'Voltage': {0: 14, 1: 12, 2: 15},
'Current': {0: 56, 1: 58, 2: 55}
}
df = pd.DataFrame(data)
现在您应该转换数据框,以便您拥有所需的列 melt
:
df = df.melt(["Time"], var_name="parameter", value_name="parameter_value")
此时您应该将 parameter
值映射到外部 id
。我将以 params
为例:
params = {"Temperature": 1, "Voltage": 2, "Current": 3}
df["parameter"] = df["parameter"].map(params)
此时数据框将如下所示:
Time parameter parameter_value
0 0.0 1 7.8
1 0.1 1 7.9
2 0.2 1 7.6
3 0.0 2 14.0
4 0.1 2 12.0
5 0.2 2 15.0
6 0.0 3 56.0
7 0.1 3 58.0
8 0.2 3 55.0
现在要使用 pandas 导出,您可以使用:
import sqlalchemy as sa
engine = sa.create_engine("use your connection data")
df.to_sql(name="my_table", con=engine, if_exists="append", index=False)
但是我用的时候速度不够快,不能满足我们的要求。所以我建议你使用 cursor.copy_from
insted 因为它更快:
from io import StringIO
output = StringIO()
df.to_csv(output, sep=';', header=False, index=False, columns=df.columns)
output.getvalue()
# jump to start of stream
output.seek(0)
# Insert df into postgre
connection = engine.raw_connection()
with connection.cursor() as cursor:
cursor.copy_from(output, "my_table", sep=';', null="NULL", columns=(df.columns))
connection.commit()
我们尝试了数百万次,这是使用 PostgreSQL.
时最快的方法