Airflow 2.1.0 使用 TriggerDagRunOperator 将变量传递给另一个 DAG

Airflow 2.1.0 passing variable to another DAG using TriggerDagRunOperator

我们正在使用 Airflow 2.1.0 并希望触发 DAG 并使用 TriggerDagRunOperator 将变量(S3 文件名)传递给它。

我找到了这样的例子,可以使用 conf:

将静态 JSON 传递给下一个 DAG
    @task()
    def trigger_target_dag_task(context):
        TriggerDagRunOperator(
            task_id="trigger_target_dag",
            trigger_dag_id="target_dag",
            conf={"file_name": "test.txt"}
        ).execute(context)

但是,我找不到不使用 python_callable 动态创建 conf 的当前示例 - 这似乎很接近:

https://github.com/apache/airflow/pull/6317#issuecomment-859556243

这可能吗?

更新问题:

我用的时候这个方法没有用:

    @task()
    def trigger_dag_task(context):
        TriggerDagRunOperator(
            task_id="trigger_dag_task",
            trigger_dag_id="target_dag",
            conf={"payload": "{{ ti.xcom_pull(task_ids='extract_rss') }}"},
        ).execute(context)

target_dag 收到字符串形式的 conf:

{logging_mixin.py:104} INFO - Remotely received value of {{ ti.xcom_pull(task_ids='extract_rss') }}

Conf 是一个模板化字段,因此您可以使用 Jinja 来传递任何变量。考虑这个基于官方 TriggerDagRunOperator example

的例子

如果变量 (object_name) 在您的范围内,您可以这样做:

控制器 DAG:

dag = DAG(
    dag_id="example_trigger_controller_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval="@once",
    tags=['example'],
)
object_name = "my-object-s3-aws"

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",
    trigger_dag_id="example_trigger_target_dag",
    conf={"s3_object":  object_name},
    dag=dag,
)

目标DAG:

dag = DAG(
    dag_id="example_trigger_target_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval=None,
    tags=['example'],
)


def run_this_func(**context):
    print("Remotely received value of {} for key=message".format(
        context["dag_run"].conf["s3_object"]))


run_this = PythonOperator(
    task_id="run_this", python_callable=run_this_func, dag=dag)

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: $message"',
    env={'message': '{{ dag_run.conf["s3_object"] if dag_run else "" }}'},
    dag=dag,
)

如果变量存储为 Airflow Variable,您可以像这样检索它:

conf={"s3_object": "{{var.json.s3_object}}"}

如果它是上一个任务的 XCom,你可以这样做:

conf={"s3_object": "{{ ti.xcom_pull(task_ids='previous_task_id', key='return_value') }}"

让我知道这是否对您有用!

docs

编辑:

这是一个工作示例,在 2.0.1 版本中测试,在 conf 参数中使用 xcom_pull

控制器 DAG:

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def _do_something():
    return "my-object-s3-aws"

dag = DAG(
    dag_id="example_trigger_controller_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval="@once",
    tags=['example'],
)

task_1 = PythonOperator(task_id='previous_task_id',
                        python_callable=_do_something)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",
    trigger_dag_id="example_trigger_target_dag",
    conf={
        "s3_object":
        "{{ ti.xcom_pull(task_ids='previous_task_id', key='return_value') }}"},
    dag=dag,
)

task_1 >> trigger

目标DAG:


from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id="example_trigger_target_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval=None,
    tags=['example'],
)


def run_this_func(**context):
    print("Remotely received value of {} ".format(
        context["dag_run"].conf["s3_object"]))


run_this = PythonOperator(
    task_id="run_this", python_callable=run_this_func, dag=dag)

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: $s3_object"',
    env={'s3_object': '{{ dag_run.conf["s3_object"] if dag_run else "" }}'},
    dag=dag,
)

来自 run_this 任务的日志:

[2021-07-15 19:24:11,410] {logging_mixin.py:104} INFO - Remotely received value of my-object-s3-aws