如何在Kedro Node中使用SQL Server Bulk Insert?
How to use SQL Server Bulk Insert in Kedro Node?
我正在使用 Kedro 管理数据管道,在最后一步我有一个巨大的 csv 文件存储在 S3 存储桶中,我需要将它加载回 SQL 服务器。
我通常会使用 bulk insert,但不太确定如何将其放入 kedro 模板。这是在 catalog.yml
中配置的目标 table 和 S3 存储桶
flp_test:
type: pandas.SQLTableDataSet
credentials: dw_dev_credentials
table_name: flp_tst
load_args:
schema: 'dwschema'
save_args:
schema: 'dwschema'
if_exists: 'replace'
bulk_insert_input:
type: pandas.CSVDataSet
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
credentials: dev_s3
def insert_data(self, conn, csv_file_nm, db_table_nm):
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
# Execute the query
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
- 如何将
csv_file_nm
指向我的 bulk_insert_input
S3 目录?
- 是否有间接访问
dw_dev_credentials
进行插入的正确方法?
Kedro 的 pandas.SQLTableDataSet.html uses the pandas.to_sql 方法不变。要按原样使用它,您需要一个 pandas.CSVDataSet
到 node
,然后写入目标 pandas.SQLDataTable
数据集,以便将其写入 SQL。如果您有可用的 Spark,这将比 Pandas.
更快
为了使用内置的 BULK INSERT
查询,我认为您需要定义一个 custom dataset.
我正在使用 Kedro 管理数据管道,在最后一步我有一个巨大的 csv 文件存储在 S3 存储桶中,我需要将它加载回 SQL 服务器。
我通常会使用 bulk insert,但不太确定如何将其放入 kedro 模板。这是在 catalog.yml
flp_test:
type: pandas.SQLTableDataSet
credentials: dw_dev_credentials
table_name: flp_tst
load_args:
schema: 'dwschema'
save_args:
schema: 'dwschema'
if_exists: 'replace'
bulk_insert_input:
type: pandas.CSVDataSet
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
credentials: dev_s3
def insert_data(self, conn, csv_file_nm, db_table_nm):
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
# Execute the query
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
- 如何将
csv_file_nm
指向我的bulk_insert_input
S3 目录? - 是否有间接访问
dw_dev_credentials
进行插入的正确方法?
Kedro 的 pandas.SQLTableDataSet.html uses the pandas.to_sql 方法不变。要按原样使用它,您需要一个 pandas.CSVDataSet
到 node
,然后写入目标 pandas.SQLDataTable
数据集,以便将其写入 SQL。如果您有可用的 Spark,这将比 Pandas.
为了使用内置的 BULK INSERT
查询,我认为您需要定义一个 custom dataset.