Airflow 2.1.0 使用 TriggerDagRunOperator 将变量传递给另一个 DAG
Airflow 2.1.0 passing variable to another DAG using TriggerDagRunOperator
我们正在使用 Airflow 2.1.0 并希望触发 DAG 并使用 TriggerDagRunOperator 将变量(S3 文件名)传递给它。
我找到了这样的例子,可以使用 conf:
将静态 JSON 传递给下一个 DAG
@task()
def trigger_target_dag_task(context):
TriggerDagRunOperator(
task_id="trigger_target_dag",
trigger_dag_id="target_dag",
conf={"file_name": "test.txt"}
).execute(context)
但是,我找不到不使用 python_callable 动态创建 conf 的当前示例 - 这似乎很接近:
https://github.com/apache/airflow/pull/6317#issuecomment-859556243
这可能吗?
更新问题:
我用的时候这个方法没有用:
@task()
def trigger_dag_task(context):
TriggerDagRunOperator(
task_id="trigger_dag_task",
trigger_dag_id="target_dag",
conf={"payload": "{{ ti.xcom_pull(task_ids='extract_rss') }}"},
).execute(context)
target_dag 收到字符串形式的 conf:
{logging_mixin.py:104} INFO - Remotely received value of {{ ti.xcom_pull(task_ids='extract_rss') }}
Conf
是一个模板化字段,因此您可以使用 Jinja 来传递任何变量。考虑这个基于官方 TriggerDagRunOperator example
的例子
如果变量 (object_name
) 在您的范围内,您可以这样做:
控制器 DAG:
dag = DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
)
object_name = "my-object-s3-aws"
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag",
conf={"s3_object": object_name},
dag=dag,
)
目标DAG:
dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
)
def run_this_func(**context):
print("Remotely received value of {} for key=message".format(
context["dag_run"].conf["s3_object"]))
run_this = PythonOperator(
task_id="run_this", python_callable=run_this_func, dag=dag)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $message"',
env={'message': '{{ dag_run.conf["s3_object"] if dag_run else "" }}'},
dag=dag,
)
如果变量存储为 Airflow Variable
,您可以像这样检索它:
conf={"s3_object": "{{var.json.s3_object}}"}
如果它是上一个任务的 XCom,你可以这样做:
conf={"s3_object": "{{ ti.xcom_pull(task_ids='previous_task_id', key='return_value') }}"
让我知道这是否对您有用!
编辑:
这是一个工作示例,在 2.0.1 版本中测试,在 conf
参数中使用 xcom_pull
:
控制器 DAG:
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def _do_something():
return "my-object-s3-aws"
dag = DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
)
task_1 = PythonOperator(task_id='previous_task_id',
python_callable=_do_something)
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag",
conf={
"s3_object":
"{{ ti.xcom_pull(task_ids='previous_task_id', key='return_value') }}"},
dag=dag,
)
task_1 >> trigger
目标DAG:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
)
def run_this_func(**context):
print("Remotely received value of {} ".format(
context["dag_run"].conf["s3_object"]))
run_this = PythonOperator(
task_id="run_this", python_callable=run_this_func, dag=dag)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $s3_object"',
env={'s3_object': '{{ dag_run.conf["s3_object"] if dag_run else "" }}'},
dag=dag,
)
来自 run_this
任务的日志:
[2021-07-15 19:24:11,410] {logging_mixin.py:104} INFO - Remotely received value of my-object-s3-aws
我们正在使用 Airflow 2.1.0 并希望触发 DAG 并使用 TriggerDagRunOperator 将变量(S3 文件名)传递给它。
我找到了这样的例子,可以使用 conf:
将静态 JSON 传递给下一个 DAG @task()
def trigger_target_dag_task(context):
TriggerDagRunOperator(
task_id="trigger_target_dag",
trigger_dag_id="target_dag",
conf={"file_name": "test.txt"}
).execute(context)
但是,我找不到不使用 python_callable 动态创建 conf 的当前示例 - 这似乎很接近:
https://github.com/apache/airflow/pull/6317#issuecomment-859556243
这可能吗?
更新问题:
我用的时候这个方法没有用:
@task()
def trigger_dag_task(context):
TriggerDagRunOperator(
task_id="trigger_dag_task",
trigger_dag_id="target_dag",
conf={"payload": "{{ ti.xcom_pull(task_ids='extract_rss') }}"},
).execute(context)
target_dag 收到字符串形式的 conf:
{logging_mixin.py:104} INFO - Remotely received value of {{ ti.xcom_pull(task_ids='extract_rss') }}
Conf
是一个模板化字段,因此您可以使用 Jinja 来传递任何变量。考虑这个基于官方 TriggerDagRunOperator example
如果变量 (object_name
) 在您的范围内,您可以这样做:
控制器 DAG:
dag = DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
)
object_name = "my-object-s3-aws"
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag",
conf={"s3_object": object_name},
dag=dag,
)
目标DAG:
dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
)
def run_this_func(**context):
print("Remotely received value of {} for key=message".format(
context["dag_run"].conf["s3_object"]))
run_this = PythonOperator(
task_id="run_this", python_callable=run_this_func, dag=dag)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $message"',
env={'message': '{{ dag_run.conf["s3_object"] if dag_run else "" }}'},
dag=dag,
)
如果变量存储为 Airflow Variable
,您可以像这样检索它:
conf={"s3_object": "{{var.json.s3_object}}"}
如果它是上一个任务的 XCom,你可以这样做:
conf={"s3_object": "{{ ti.xcom_pull(task_ids='previous_task_id', key='return_value') }}"
让我知道这是否对您有用!
编辑:
这是一个工作示例,在 2.0.1 版本中测试,在 conf
参数中使用 xcom_pull
:
控制器 DAG:
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def _do_something():
return "my-object-s3-aws"
dag = DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
)
task_1 = PythonOperator(task_id='previous_task_id',
python_callable=_do_something)
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag",
conf={
"s3_object":
"{{ ti.xcom_pull(task_ids='previous_task_id', key='return_value') }}"},
dag=dag,
)
task_1 >> trigger
目标DAG:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
)
def run_this_func(**context):
print("Remotely received value of {} ".format(
context["dag_run"].conf["s3_object"]))
run_this = PythonOperator(
task_id="run_this", python_callable=run_this_func, dag=dag)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $s3_object"',
env={'s3_object': '{{ dag_run.conf["s3_object"] if dag_run else "" }}'},
dag=dag,
)
来自 run_this
任务的日志:
[2021-07-15 19:24:11,410] {logging_mixin.py:104} INFO - Remotely received value of my-object-s3-aws