来自临时阶段的雪花更新

Snowflake upsert from temp stage

我有一个 json 格式的数据:

api_data = [{'ID': '1314420000c28b88d31115b9d8530bb1', 'NAME': 'Dummy User', 'STATUS': 'ACTIVE'}]

我有这些专栏。

columns = ['ID', 'NAME', 'STATUS']

我正在将数据作为 csv 文件加载到雪花中的暂存区,并将数据作为合并命令插入到主 table 中。 Id 是我的主键。 以下是我的代码:

def test_merger(self, api_data):
        columns = [x for x in api_data[0].keys()]


        try:
            with TemporaryDirectory(prefix=f'test_USER_') as tmpdir:
                df = pd.DataFrame(api_data)


                df.to_csv(tmpdir + f'/test_USER.csv', sep='^', index=False, columns=columns)


                stage_name = f"USER_{snowflake_client.generate_random_string()}"
                create_stage = f"CREATE TEMPORARY STAGE {stage_name} COMMENT = 'TEMPORARY STAGE FOR USER DATA LOAD'"
                snowflake_client.run("ALTER SESSION SET TIMEZONE = 'UTC';")
                snowflake_client.run(create_stage)

                snowflake_client.run(f"put file://{tmpdir}/* @{stage_name} PARALLEL=16")

                snowflake_client.run(
                    f"MERGE INTO USER USING (SELECT   TID,   TNAME,   TSTATUS FROM @{stage_name})  TEMPSTAGE"
                    f"ON USER.ID = TEMPSTAGE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPSTAGE.TNAME, USER.STATUS = TEMPSTAGE.TSTATUS "
                    "WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPSTAGE.TID, TEMPSTAGE.TNAME, TEMPSTAGE.TSTATUS);")

        except Exception as e:
            logger.error(f"ERROR {e}")
            raise e

正在正确创建 csv 文件:

ID^NAME^STATUS
1314420000c28b88d31115b9d8530bb1^Dummy User^ACTIVE

我的用户 table 被错误填充为:

ID                      NAME                STATUS
ID^NAME^STATUS

我想要的是:

ID                                        NAME                STATUS
1314420000c28b88d31115b9d8530bb1          Dummy User          Active

我做错了什么?

我假设您文件中的字段分隔符是 ^ - 基于您显示为正在加载的数据。如果是这种情况,那么您将需要使用一种文件格式,该格式可以针对您的舞台定义,也可以在您的 select 语句

中定义

File formats when querying stages