Pandas 数据帧到 Object 批量数据库插入的实例数组效率

Pandas dataframe to Object instances array efficiency for bulk DB insert

我有一个 Pandas 数据框,其形式为:

Time    Temperature    Voltage    Current
0.0     7.8            14         56
0.1     7.9            12         58
0.2     7.6            15         55
... So on for a few hundred thousand rows...

我需要尽快将数据批量插入到 PostgreSQL 数据库中。这是一个 Django 项目,我目前正在使用 ORM 进行数据库操作和构建查询,但如果有更有效的方法来完成任务,我愿意接受建议。

我的数据模型如下所示:

class Data(models.Model):
    time = models.DateTimeField(db_index=True)
    parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
    parameter_value = models.FloatField()

所以 Time 是 DataFrame 的 row[0],然后对于每个 header 列,我使用 header 作为 parameter。所以示例 table 的 row[0] 会在我的数据库中生成 3 Data objects:

Data(time=0.0, parameter="Temperature", parameter_value=7.8)
Data(time=0.0, parameter="Voltage", parameter_value=14)
Data(time=0.0, parameter="Current", parameter_value=56)

我们的应用程序允许用户解析以毫秒为单位的数据文件。所以我们从一个文件中生成了很多单独的数据 objects。我当前的任务是改进解析器,使其更加高效,直到我们达到 I/O 硬件级别的限制。

我目前的解决方案是遍历每一行,为 time + parameter + value 上的每一行创建一个 Data object 并将所述 object 附加到一个数组中,这样我就可以Data.objects.bulk_create(all_data_objects) 通过 Django。当然我知道这是低效的,可能会有很大的改进。

使用此代码:

# Convert DataFrame to dict
df_records = df.to_dict('records')

# Start empty dta array
all_data_objects = []

# Go through each row creating objects and appending to data array
for row in df_records:
    for parameter, parameter_value in row.items():
        if parameter != "Time":
            all_data_objects.append(Data(
                    time=row["Time"],
                    parameter_value=parameter_value,
                    parameter=parameter))

# Commit data to Postgres DB
Data.objects.bulk_create(all_data)

目前整个操作,不包括数据库插入操作(写入磁盘),即只生成Data objects数组,对于生成大约 600 万个 Data object 的 55mb 文件,大约需要 370 秒。仅 df_records = df.to_dict('records') 行需要 83 秒。在每个部分的两端使用 time.time() 测量时间并计算差异。

我怎样才能改善这些时间?

您不需要为所有行创建数据对象。 SqlAlchemy 也支持这种方式的批量插入:

data.insert().values([
                    dict(time=0.0, parameter="Temperature", parameter_value=7.8),
                    dict(time=0.0, parameter="Voltage", parameter_value=14)
                ])

有关详细信息,请参阅 https://docs.sqlalchemy.org/en/13/core/dml.html?highlight=insert%20values#sqlalchemy.sql.expression.ValuesBase.values

如果您只需要插入数据,则不需要 pandas 并且可以为您的数据文件使用不同的解析器(或者编写您自己的解析器,具体取决于您的数据文件的格式)。此外,将数据集拆分成更小的部分并并行化插入命令可能是有意义的。

如果你真的需要一个快速的解决方案,我建议你直接使用pandas.

来愚弄table

首先让我们为您的示例创建数据:

import pandas as pd

data = {
    'Time': {0: 0.0, 1: 0.1, 2: 0.2},
    'Temperature': {0: 7.8, 1: 7.9, 2: 7.6},
    'Voltage': {0: 14, 1: 12, 2: 15},
    'Current': {0: 56, 1: 58, 2: 55}
}
df = pd.DataFrame(data)

现在您应该转换数据框,以便您拥有所需的列 melt:

df = df.melt(["Time"], var_name="parameter", value_name="parameter_value")

此时您应该将 parameter 值映射到外部 id。我将以 params 为例:

params = {"Temperature": 1, "Voltage": 2, "Current": 3}
df["parameter"] = df["parameter"].map(params)

此时数据框将如下所示:

   Time  parameter  parameter_value
0   0.0          1              7.8
1   0.1          1              7.9
2   0.2          1              7.6
3   0.0          2             14.0
4   0.1          2             12.0
5   0.2          2             15.0
6   0.0          3             56.0
7   0.1          3             58.0
8   0.2          3             55.0

现在要使用 pandas 导出,您可以使用:

import sqlalchemy as sa
engine = sa.create_engine("use your connection data")
df.to_sql(name="my_table", con=engine, if_exists="append", index=False)

但是我用的时候速度不够快,不能满足我们的要求。所以我建议你使用 cursor.copy_from insted 因为它更快:

from io import StringIO

output = StringIO()
df.to_csv(output, sep=';', header=False, index=False, columns=df.columns)
output.getvalue()
# jump to start of stream
output.seek(0)

# Insert df into postgre
connection = engine.raw_connection()
with connection.cursor() as cursor:
    cursor.copy_from(output, "my_table", sep=';', null="NULL", columns=(df.columns))
    connection.commit()

我们尝试了数百万次,这是使用 PostgreSQL.

最快的方法