尝试在 Airflow PostgresHook 中获取更新插入行为时出错

Error trying to get upsert behavior in Airflow PostgresHook

我正在使用以下内容更新 table 中的值,其中键是 (team, year):

pg_conn.insert_rows('kenpom',
                    list(df.itertuples(index=False)),
                    target_fields=['rank',
                                   'team',
                                   'ortg',
                                   'drtg',
                                   'year'],
                    replace=True)

我的假设是使用 replace=True 使它的行为像 upsert 但我收到以下错误:

[2021-09-10 12:36:51,901] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/evan/airflow/dags/top_lines_daily_pipeline.py", line 28, in _update_kenpom_table
    pg_conn.insert_rows('kenpom',
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 305, in insert_rows
    sql = self._generate_insert_sql(table, values, target_fields, replace, **kwargs)
  File "/home/evan/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 239, in _generate_insert_sql
    raise ValueError("PostgreSQL ON CONFLICT upsert syntax requires an unique index")
ValueError: PostgreSQL ON CONFLICT upsert syntax requires an unique index

我不明白 ValueError: PostgreSQL ON CONFLICT upsert syntax requires an unique index 因为 在 table 中的唯一索引。我在这里遗漏了什么吗?

您的函数中缺少 replace_index 个参数。

PostgresHook 由于语法不同,其行为与其他挂钩不同。通常 replace=True 就足够了。但是,这会将 sql 从 INSERT INTO 更改为 REPLACE INTO - 这对 PostgreSQL(source code) 无效。 因此 PostgresHook 有自己的实现,需要额外的参数:replace_indexreplace_index 是用作 ON CONFLICT 子句索引的列或列名列表。

您要创建的语句是:

INSERT INTO ... ON CONFLICT ({}) DO UPDATE SET {}

replace_index 处理将在 ON CONFLICT ({})

中的列

您可以查看生成它的source code