将 Airflow PostgresOperator 输出保存到 Pandas Dataframe

Saving Airflow PostgresOperator output to Pandas Dataframe

我使用以下 PostgresOperator 运行 SQL 查询。我想看看我是否可以将此气流 DAG 的输出保存为 Pandas Dataframe。

section_1 = PostgresOperator(
task_id='task_id',
default_args=args,
postgres_conn_id="db_conn_id",
sql="fetching_data.sql",
dag=dag)

SQL 查询:

select cast_id, prod_id, name from sales;

我希望将输出保存为 Pandas 数据框。

您不能在运算符之间传递数据帧。 Airflow 确实提供了以 Xcoms 的形式在任务(运算符)之间传递元数据的能力,但您不应该使用它来传递大量数据。

要与数据框交互,您需要使用 PostgresHook:

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator

def my_task():
    hook = PostgresHook(postgres_conn_id="db_conn_id")
    df = hook.get_pandas_df(sql="select cast_id, prod_id, name from sales;")
    # do what you need with the df....

run_this = PythonOperator(
    task_id='postgres_task',
    python_callable=my_task,
)

如果你想从文件中读取查询,你也可以这样做:

class SQLPythonOperator(PythonOperator):
    template_ext = PythonOperator.template_ext + ('.sql',)
    
def my_task(**context):
    query = context['templates_dict']['query']
    hook = PostgresHook(postgres_conn_id="db_conn_id")
    df = hook.get_pandas_df(sql=query)
    # do what you need with the df....


run_this = SQLPythonOperator(
    task_id='postgres_task',
    python_callable=my_task,
    templates_dict={'query': 'fetching_data.sql'},
)

或者,您可以编写自己的自定义运算符来实现您希望的逻辑。