如何将 PostgreSQL 查询结果传递给 Airflow 中的变量? (Postgres Operator 或 Postgres Hook)

How to pass the PostgreSQL query result into a variable in Airflow? (Postgres Operator or Postgres Hook)

我打算使用 PostgreSQL 作为我的任务元信息提供者,所以我想 运行 一些查询并获取一些数据并将其像填充变量一样传递给另一个任务。 问题是当我使用 PostgresHook 时,我得到了数据,但是它在我无法访问的 python 方法中,实际上我看到了下面的行

[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]

这是我的部分代码:

def _query_postgres(**context):
    """
    Queries Postgres and returns a cursor to the results.
    """

    postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
    conn = postgres.get_conn()
    cursor = conn.cursor()
    mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")

    # iterate over to get a list of dicts
    details_dicts = [doc for doc in cursor]

    # serialize to json string
    details_json_string = json.dumps(details_dicts, default=json_util.default)

    task_instance = context['task_instance']
    task_instance.xcom_push(key="my_value", value=details_json_string)
    return details_json_string

但我不知道应该使用哪个变量来访问它或如何将它推送到 XCOM,以便我可以将该返回值用作另一个 bashoperator 任务(例如 Spark)的参数。

PostgresOperator 另一方面,结果只有 returns None

PostgresOperator 没有 return 任何值,因此很遗憾,您不能使用它来传递数据。您必须实现自己的运算符,为此您确实可以使用 PostgresHook。

关于您的代码,有几点需要注意:

  1. “返回值是”日志是从 PythonOperator 输出的吗?
  2. 您可以“显式”推送到您使用 xcom_push() 显示的 XCom,但是 return 一个值也会自动推送到 XCom,因此您的输出将两次存储在 XCom 中。
  3. 您可以使用 xcom_pull()“拉取”XCom 值,更多详细信息请参见:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
  4. 您可以使用 cursor.fetchall() 从 Postgres 游标获取所有输出,我这里有一个类似的示例(它将输出写入本地磁盘):https://github.com/godatadriven/airflow-testing-examples/blob/master/src/testing_examples/operators/postgres_to_local_operator.py#L35-L41
  5. 小心处理大数据和 XComs。默认情况下,XComs 存储在 Airflow Metastore 中,您不希望在那里存储太大的数据。或者,您可以配置自定义 XCom 后端,允许您使用例如用于 XCom 存储的 AWS S3:https://www.astronomer.io/guides/custom-xcom-backends

XComs 背后的技巧是你 push 他们在一个任务中 pull 它在另一个任务中。如果你想在 bash 运算符中使用你推入 _query_postgres 函数的 XCom 你可以使用这样的东西:

puller = BashOperator(
        task_id="do_something_postgres_result",
        bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}",
        dag=dag)

您需要将 bash_command 替换为适当的,并将 task_idsxcom_pull() 更改为从您创建的任务中设置 task_id调用 _query_postgres 函数。

关于PostgresOperator,returnsNone没关系。它不是用于数据提取(即使你 运行 一个 SELECT 查询。你用 PostgresHook 实现它的方式是好的。

一些值得理解的好资源XComs

  1. https://medium.com/analytics-vidhya/airflow-tricks-xcom-and-subdag-361ff5cd46ff
  2. https://precocityllc.com/blog/airflow-and-xcom-inter-task-communication-use-cases/
  3. https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py