如何将 PostgreSQL 查询结果传递给 Airflow 中的变量? (Postgres Operator 或 Postgres Hook)
How to pass the PostgreSQL query result into a variable in Airflow? (Postgres Operator or Postgres Hook)
我打算使用 PostgreSQL 作为我的任务元信息提供者,所以我想 运行 一些查询并获取一些数据并将其像填充变量一样传递给另一个任务。
问题是当我使用 PostgresHook 时,我得到了数据,但是它在我无法访问的 python 方法中,实际上我看到了下面的行
[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]
这是我的部分代码:
def _query_postgres(**context):
"""
Queries Postgres and returns a cursor to the results.
"""
postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
conn = postgres.get_conn()
cursor = conn.cursor()
mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")
# iterate over to get a list of dicts
details_dicts = [doc for doc in cursor]
# serialize to json string
details_json_string = json.dumps(details_dicts, default=json_util.default)
task_instance = context['task_instance']
task_instance.xcom_push(key="my_value", value=details_json_string)
return details_json_string
但我不知道应该使用哪个变量来访问它或如何将它推送到 XCOM,以便我可以将该返回值用作另一个 bashoperator 任务(例如 Spark)的参数。
PostgresOperator 另一方面,结果只有 returns None
。
PostgresOperator 没有 return 任何值,因此很遗憾,您不能使用它来传递数据。您必须实现自己的运算符,为此您确实可以使用 PostgresHook。
关于您的代码,有几点需要注意:
- “返回值是”日志是从 PythonOperator 输出的吗?
- 您可以“显式”推送到您使用
xcom_push()
显示的 XCom,但是 return 一个值也会自动推送到 XCom,因此您的输出将两次存储在 XCom 中。
- 您可以使用
xcom_pull()
“拉取”XCom 值,更多详细信息请参见:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
- 您可以使用
cursor.fetchall()
从 Postgres 游标获取所有输出,我这里有一个类似的示例(它将输出写入本地磁盘):https://github.com/godatadriven/airflow-testing-examples/blob/master/src/testing_examples/operators/postgres_to_local_operator.py#L35-L41
- 小心处理大数据和 XComs。默认情况下,XComs 存储在 Airflow Metastore 中,您不希望在那里存储太大的数据。或者,您可以配置自定义 XCom 后端,允许您使用例如用于 XCom 存储的 AWS S3:https://www.astronomer.io/guides/custom-xcom-backends
XComs
背后的技巧是你 push
他们在一个任务中 pull
它在另一个任务中。如果你想在 bash 运算符中使用你推入 _query_postgres
函数的 XCom
你可以使用这样的东西:
puller = BashOperator(
task_id="do_something_postgres_result",
bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}",
dag=dag)
您需要将 bash_command
替换为适当的,并将 task_ids
从 xcom_pull()
更改为从您创建的任务中设置 task_id
调用 _query_postgres
函数。
关于PostgresOperator
,returnsNone
没关系。它不是用于数据提取(即使你 运行 一个 SELECT
查询。你用 PostgresHook
实现它的方式是好的。
一些值得理解的好资源XComs
:
我打算使用 PostgreSQL 作为我的任务元信息提供者,所以我想 运行 一些查询并获取一些数据并将其像填充变量一样传递给另一个任务。 问题是当我使用 PostgresHook 时,我得到了数据,但是它在我无法访问的 python 方法中,实际上我看到了下面的行
[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]
这是我的部分代码:
def _query_postgres(**context):
"""
Queries Postgres and returns a cursor to the results.
"""
postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
conn = postgres.get_conn()
cursor = conn.cursor()
mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")
# iterate over to get a list of dicts
details_dicts = [doc for doc in cursor]
# serialize to json string
details_json_string = json.dumps(details_dicts, default=json_util.default)
task_instance = context['task_instance']
task_instance.xcom_push(key="my_value", value=details_json_string)
return details_json_string
但我不知道应该使用哪个变量来访问它或如何将它推送到 XCOM,以便我可以将该返回值用作另一个 bashoperator 任务(例如 Spark)的参数。
PostgresOperator 另一方面,结果只有 returns None
。
PostgresOperator 没有 return 任何值,因此很遗憾,您不能使用它来传递数据。您必须实现自己的运算符,为此您确实可以使用 PostgresHook。
关于您的代码,有几点需要注意:
- “返回值是”日志是从 PythonOperator 输出的吗?
- 您可以“显式”推送到您使用
xcom_push()
显示的 XCom,但是 return 一个值也会自动推送到 XCom,因此您的输出将两次存储在 XCom 中。 - 您可以使用
xcom_pull()
“拉取”XCom 值,更多详细信息请参见:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html - 您可以使用
cursor.fetchall()
从 Postgres 游标获取所有输出,我这里有一个类似的示例(它将输出写入本地磁盘):https://github.com/godatadriven/airflow-testing-examples/blob/master/src/testing_examples/operators/postgres_to_local_operator.py#L35-L41 - 小心处理大数据和 XComs。默认情况下,XComs 存储在 Airflow Metastore 中,您不希望在那里存储太大的数据。或者,您可以配置自定义 XCom 后端,允许您使用例如用于 XCom 存储的 AWS S3:https://www.astronomer.io/guides/custom-xcom-backends
XComs
背后的技巧是你 push
他们在一个任务中 pull
它在另一个任务中。如果你想在 bash 运算符中使用你推入 _query_postgres
函数的 XCom
你可以使用这样的东西:
puller = BashOperator(
task_id="do_something_postgres_result",
bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}",
dag=dag)
您需要将 bash_command
替换为适当的,并将 task_ids
从 xcom_pull()
更改为从您创建的任务中设置 task_id
调用 _query_postgres
函数。
关于PostgresOperator
,returnsNone
没关系。它不是用于数据提取(即使你 运行 一个 SELECT
查询。你用 PostgresHook
实现它的方式是好的。
一些值得理解的好资源XComs
: