来自临时阶段的雪花更新
Snowflake upsert from temp stage
我有一个 json 格式的数据:
api_data = [{'ID': '1314420000c28b88d31115b9d8530bb1', 'NAME': 'Dummy User', 'STATUS': 'ACTIVE'}]
我有这些专栏。
columns = ['ID', 'NAME', 'STATUS']
我正在将数据作为 csv 文件加载到雪花中的暂存区,并将数据作为合并命令插入到主 table 中。 Id 是我的主键。
以下是我的代码:
def test_merger(self, api_data):
columns = [x for x in api_data[0].keys()]
try:
with TemporaryDirectory(prefix=f'test_USER_') as tmpdir:
df = pd.DataFrame(api_data)
df.to_csv(tmpdir + f'/test_USER.csv', sep='^', index=False, columns=columns)
stage_name = f"USER_{snowflake_client.generate_random_string()}"
create_stage = f"CREATE TEMPORARY STAGE {stage_name} COMMENT = 'TEMPORARY STAGE FOR USER DATA LOAD'"
snowflake_client.run("ALTER SESSION SET TIMEZONE = 'UTC';")
snowflake_client.run(create_stage)
snowflake_client.run(f"put file://{tmpdir}/* @{stage_name} PARALLEL=16")
snowflake_client.run(
f"MERGE INTO USER USING (SELECT TID, TNAME, TSTATUS FROM @{stage_name}) TEMPSTAGE"
f"ON USER.ID = TEMPSTAGE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPSTAGE.TNAME, USER.STATUS = TEMPSTAGE.TSTATUS "
"WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPSTAGE.TID, TEMPSTAGE.TNAME, TEMPSTAGE.TSTATUS);")
except Exception as e:
logger.error(f"ERROR {e}")
raise e
正在正确创建 csv 文件:
ID^NAME^STATUS
1314420000c28b88d31115b9d8530bb1^Dummy User^ACTIVE
我的用户 table 被错误填充为:
ID NAME STATUS
ID^NAME^STATUS
我想要的是:
ID NAME STATUS
1314420000c28b88d31115b9d8530bb1 Dummy User Active
我做错了什么?
我假设您文件中的字段分隔符是 ^ - 基于您显示为正在加载的数据。如果是这种情况,那么您将需要使用一种文件格式,该格式可以针对您的舞台定义,也可以在您的 select 语句
中定义
我有一个 json 格式的数据:
api_data = [{'ID': '1314420000c28b88d31115b9d8530bb1', 'NAME': 'Dummy User', 'STATUS': 'ACTIVE'}]
我有这些专栏。
columns = ['ID', 'NAME', 'STATUS']
我正在将数据作为 csv 文件加载到雪花中的暂存区,并将数据作为合并命令插入到主 table 中。 Id 是我的主键。 以下是我的代码:
def test_merger(self, api_data):
columns = [x for x in api_data[0].keys()]
try:
with TemporaryDirectory(prefix=f'test_USER_') as tmpdir:
df = pd.DataFrame(api_data)
df.to_csv(tmpdir + f'/test_USER.csv', sep='^', index=False, columns=columns)
stage_name = f"USER_{snowflake_client.generate_random_string()}"
create_stage = f"CREATE TEMPORARY STAGE {stage_name} COMMENT = 'TEMPORARY STAGE FOR USER DATA LOAD'"
snowflake_client.run("ALTER SESSION SET TIMEZONE = 'UTC';")
snowflake_client.run(create_stage)
snowflake_client.run(f"put file://{tmpdir}/* @{stage_name} PARALLEL=16")
snowflake_client.run(
f"MERGE INTO USER USING (SELECT TID, TNAME, TSTATUS FROM @{stage_name}) TEMPSTAGE"
f"ON USER.ID = TEMPSTAGE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPSTAGE.TNAME, USER.STATUS = TEMPSTAGE.TSTATUS "
"WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPSTAGE.TID, TEMPSTAGE.TNAME, TEMPSTAGE.TSTATUS);")
except Exception as e:
logger.error(f"ERROR {e}")
raise e
正在正确创建 csv 文件:
ID^NAME^STATUS
1314420000c28b88d31115b9d8530bb1^Dummy User^ACTIVE
我的用户 table 被错误填充为:
ID NAME STATUS
ID^NAME^STATUS
我想要的是:
ID NAME STATUS
1314420000c28b88d31115b9d8530bb1 Dummy User Active
我做错了什么?
我假设您文件中的字段分隔符是 ^ - 基于您显示为正在加载的数据。如果是这种情况,那么您将需要使用一种文件格式,该格式可以针对您的舞台定义,也可以在您的 select 语句
中定义