将 Airflow PostgresOperator 输出保存到 Pandas Dataframe
Saving Airflow PostgresOperator output to Pandas Dataframe
我使用以下 PostgresOperator 运行 SQL 查询。我想看看我是否可以将此气流 DAG 的输出保存为 Pandas Dataframe。
section_1 = PostgresOperator(
task_id='task_id',
default_args=args,
postgres_conn_id="db_conn_id",
sql="fetching_data.sql",
dag=dag)
SQL 查询:
select cast_id, prod_id, name from sales;
我希望将输出保存为 Pandas 数据框。
您不能在运算符之间传递数据帧。 Airflow 确实提供了以 Xcoms 的形式在任务(运算符)之间传递元数据的能力,但您不应该使用它来传递大量数据。
要与数据框交互,您需要使用 PostgresHook:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def my_task():
hook = PostgresHook(postgres_conn_id="db_conn_id")
df = hook.get_pandas_df(sql="select cast_id, prod_id, name from sales;")
# do what you need with the df....
run_this = PythonOperator(
task_id='postgres_task',
python_callable=my_task,
)
如果你想从文件中读取查询,你也可以这样做:
class SQLPythonOperator(PythonOperator):
template_ext = PythonOperator.template_ext + ('.sql',)
def my_task(**context):
query = context['templates_dict']['query']
hook = PostgresHook(postgres_conn_id="db_conn_id")
df = hook.get_pandas_df(sql=query)
# do what you need with the df....
run_this = SQLPythonOperator(
task_id='postgres_task',
python_callable=my_task,
templates_dict={'query': 'fetching_data.sql'},
)
或者,您可以编写自己的自定义运算符来实现您希望的逻辑。
我使用以下 PostgresOperator 运行 SQL 查询。我想看看我是否可以将此气流 DAG 的输出保存为 Pandas Dataframe。
section_1 = PostgresOperator(
task_id='task_id',
default_args=args,
postgres_conn_id="db_conn_id",
sql="fetching_data.sql",
dag=dag)
SQL 查询:
select cast_id, prod_id, name from sales;
我希望将输出保存为 Pandas 数据框。
您不能在运算符之间传递数据帧。 Airflow 确实提供了以 Xcoms 的形式在任务(运算符)之间传递元数据的能力,但您不应该使用它来传递大量数据。
要与数据框交互,您需要使用 PostgresHook:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def my_task():
hook = PostgresHook(postgres_conn_id="db_conn_id")
df = hook.get_pandas_df(sql="select cast_id, prod_id, name from sales;")
# do what you need with the df....
run_this = PythonOperator(
task_id='postgres_task',
python_callable=my_task,
)
如果你想从文件中读取查询,你也可以这样做:
class SQLPythonOperator(PythonOperator):
template_ext = PythonOperator.template_ext + ('.sql',)
def my_task(**context):
query = context['templates_dict']['query']
hook = PostgresHook(postgres_conn_id="db_conn_id")
df = hook.get_pandas_df(sql=query)
# do what you need with the df....
run_this = SQLPythonOperator(
task_id='postgres_task',
python_callable=my_task,
templates_dict={'query': 'fetching_data.sql'},
)
或者,您可以编写自己的自定义运算符来实现您希望的逻辑。