使用参数手动触发气流 DAG,然后传递给 python 函数

Trigger airflow DAG manually with parameter and pass then into python function

我想将参数传递给 airflow DAG 并在 python 函数中使用它们。我可以将参数用于 bash 运算符,但我找不到任何参考将它们用作 python 函数。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago

#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
) 

task2 = PythonOperator(
    task_id='test_task2',
   python_callable=test_func(owner,table),
    dag=dag,
    queue='cdp_node53',
) 

我想在触发 DAG 时将下面作为参数传递。 “task1”对我来说很好用。我需要使“task2”可行。请指导我更正上面的代码,以便我可以将参数传递给它。

{"owner":"test_owner","table":"test_table"}

要将参数传递给 PythonOperator,您应该使用 op_args(对于位置参数)或 op_kwargs(对于关键字参数)。这两个参数也是模板字段,因此值也可以是 Jinja 表达式。

使用 op_kwargs 重构您的代码:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
)

task2 = PythonOperator(
    task_id='test_task2',
    python_callable=test_func,
    op_kwargs={"owner": owner, "table": table},
    dag=dag,
    queue='cdp_node53',
)

这两个任务现在都会记录 INFO - test_owner.test_table