如何在Kedro Node中使用SQL Server Bulk Insert?

How to use SQL Server Bulk Insert in Kedro Node?

我正在使用 Kedro 管理数据管道,在最后一步我有一个巨大的 csv 文件存储在 S3 存储桶中,我需要将它加载回 SQL 服务器。

我通常会使用 bulk insert,但不太确定如何将其放入 kedro 模板。这是在 catalog.yml

中配置的目标 table 和 S3 存储桶
flp_test:
  type: pandas.SQLTableDataSet
  credentials: dw_dev_credentials
  table_name: flp_tst
  load_args:
    schema: 'dwschema'
  save_args:
    schema: 'dwschema'
    if_exists: 'replace'

bulk_insert_input:
   type: pandas.CSVDataSet
   filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
   credentials: dev_s3


def insert_data(self, conn, csv_file_nm, db_table_nm):
    qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
    # Execute the query
    cursor = conn.cursor()
    success = cursor.execute(qry)
    conn.commit()
    cursor.close

Kedro 的 pandas.SQLTableDataSet.html uses the pandas.to_sql 方法不变。要按原样使用它,您需要一个 pandas.CSVDataSetnode,然后写入目标 pandas.SQLDataTable 数据集,以便将其写入 SQL。如果您有可用的 Spark,这将比 Pandas.

更快

为了使用内置的 BULK INSERT 查询,我认为您需要定义一个 custom dataset.