尝试在 Airflow PostgresHook 中获取更新插入行为时出错
Error trying to get upsert behavior in Airflow PostgresHook
我正在使用以下内容更新 table 中的值,其中键是 (team, year)
:
pg_conn.insert_rows('kenpom',
list(df.itertuples(index=False)),
target_fields=['rank',
'team',
'ortg',
'drtg',
'year'],
replace=True)
我的假设是使用 replace=True
使它的行为像 upsert
但我收到以下错误:
[2021-09-10 12:36:51,901] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/evan/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/evan/airflow/dags/top_lines_daily_pipeline.py", line 28, in _update_kenpom_table
pg_conn.insert_rows('kenpom',
File "/home/evan/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 305, in insert_rows
sql = self._generate_insert_sql(table, values, target_fields, replace, **kwargs)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 239, in _generate_insert_sql
raise ValueError("PostgreSQL ON CONFLICT upsert syntax requires an unique index")
ValueError: PostgreSQL ON CONFLICT upsert syntax requires an unique index
我不明白 ValueError: PostgreSQL ON CONFLICT upsert syntax requires an unique index
因为 是 在 table 中的唯一索引。我在这里遗漏了什么吗?
您的函数中缺少 replace_index
个参数。
PostgresHook
由于语法不同,其行为与其他挂钩不同。通常 replace=True
就足够了。但是,这会将 sql 从 INSERT INTO
更改为 REPLACE INTO
- 这对 PostgreSQL
(source code) 无效。
因此 PostgresHook
有自己的实现,需要额外的参数:replace_index
。 replace_index
是用作 ON CONFLICT
子句索引的列或列名列表。
您要创建的语句是:
INSERT INTO ... ON CONFLICT ({}) DO UPDATE SET {}
replace_index
处理将在 ON CONFLICT ({})
中的列
您可以查看生成它的source code。
我正在使用以下内容更新 table 中的值,其中键是 (team, year)
:
pg_conn.insert_rows('kenpom',
list(df.itertuples(index=False)),
target_fields=['rank',
'team',
'ortg',
'drtg',
'year'],
replace=True)
我的假设是使用 replace=True
使它的行为像 upsert
但我收到以下错误:
[2021-09-10 12:36:51,901] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/evan/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/evan/airflow/dags/top_lines_daily_pipeline.py", line 28, in _update_kenpom_table
pg_conn.insert_rows('kenpom',
File "/home/evan/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 305, in insert_rows
sql = self._generate_insert_sql(table, values, target_fields, replace, **kwargs)
File "/home/evan/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 239, in _generate_insert_sql
raise ValueError("PostgreSQL ON CONFLICT upsert syntax requires an unique index")
ValueError: PostgreSQL ON CONFLICT upsert syntax requires an unique index
我不明白 ValueError: PostgreSQL ON CONFLICT upsert syntax requires an unique index
因为 是 在 table 中的唯一索引。我在这里遗漏了什么吗?
您的函数中缺少 replace_index
个参数。
PostgresHook
由于语法不同,其行为与其他挂钩不同。通常 replace=True
就足够了。但是,这会将 sql 从 INSERT INTO
更改为 REPLACE INTO
- 这对 PostgreSQL
(source code) 无效。
因此 PostgresHook
有自己的实现,需要额外的参数:replace_index
。 replace_index
是用作 ON CONFLICT
子句索引的列或列名列表。
您要创建的语句是:
INSERT INTO ... ON CONFLICT ({}) DO UPDATE SET {}
replace_index
处理将在 ON CONFLICT ({})
您可以查看生成它的source code。