如何将先前的任务状态作为参数传递给 Airflow Taskflow API 中的另一个任务?

How to pass previous task state as parameter to another task within the Airflow Taskflow API?

我想获取 SparkSubmitOperator 的状态,将其转换为我的 API 理解的某个值,并将其传递到 SimpleHttpOperator 的有效负载中,以便我可以更新我的数据库中的作业状态。我想通过使用 Taskflow API.

来做到这一点

我写了下面的代码,但是当我尝试加载它时出现这个错误:

Broken DAG: [/opt/airflow/dags/export/inapp_clicks/export.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1378, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1316, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'function' object has no attribute 'update_relative'

代码:

from datetime import datetime

from airflow.decorators import dag, task
from airflow.models import Variable

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

@dag(schedule_interval=None, start_date=datetime.now(), tags=["export", "inapp"])
def export_inapp_clicks():
    DEFAULT_NUM_EXECUTORS = 2
    DEFAULT_EXECUTOR_CORES = 3
    DEFAULT_EXECUTOR_MEMORY = "2g"
    DEFAULT_DRIVER_MEMORY = "1g"

    @task()
    def update_job_status(dag, ti, execution_date):
        jst = dag.get_task("export_inapp_clicks_job_submission")
        jsti = TaskInstance(jst, execution_date)
        xcom_value = ti.xcom_pull(task_ids="export_inapp_clicks_job_submission")

        print("Task:", jst)
        print("Task Instance:", jsti)
        print("Task State:", jsti.current_state())
        print("XCOM Value:", xcom_value)
        
        # TODO: call API via SimpleHttpOperator

    job_submission = SparkSubmitOperator(
        task_id="export_inapp_clicks_job_submission",
        conn_id="yarn",
        name="{{ dag_run.conf['name'] }}",
        conf=Variable.get("export_inapp_clicks_conf", deserialize_json=True),
        jars=Variable.get("export_inapp_clicks_jars"),
        application=Variable.get("pyspark_executor_path"),
        application_args=[
            "--module",
            "export_inapp_clicks.export",
            "--org-id",
            "{{ dag_run.conf['orgId'] }}",
            "--app-id",
            "{{ dag_run.conf['appId'] }}",
            "--inapp-id",
            "{{ dag_run.conf['inappId'] }}",
            "--start-date",
            "{{ dag_run.conf['startDate'] }}",
            "--end-date",
            "{{ dag_run.conf['endDate'] }}",
            "--data-path",
            Variable.get("event_data_path"),
            "--es-nodes",
            Variable.get("es_nodes"),
            "--destination",
            Variable.get("export_inapp_clicks_output"),
            "--explain",
            "--debug",
            "--encode-columns",
            "--log-level",
            "WARN"
        ],
        py_files=Variable.get("export_inapp_clicks_py_files"),
        num_executors=Variable.get("export_inapp_clicks_num_executors", DEFAULT_NUM_EXECUTORS),
        executor_cores=Variable.get("export_inapp_clicks_executor_cores", DEFAULT_EXECUTOR_CORES),
        executor_memory=Variable.get("export_inapp_clicks_executor_memory", DEFAULT_EXECUTOR_MEMORY),
        driver_memory=Variable.get("export_inapp_clicks_driver_memory", DEFAULT_DRIVER_MEMORY),
        status_poll_interval=10
    )

    job_submission >> update_job_status

export_dag = export_inapp_clicks()

考虑以下示例,第一个任务将对应于您的 SparkSubmitOperator 任务:

_get_upstream_task 通过对元数据库执行查询,负责从第二个任务获取第一个任务的状态:

DAG 定义,前两个任务:

import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.models.taskinstance import TaskInstance
from airflow.providers.http.operators.http import SimpleHttpOperator

@dag(
    default_args= {"owner": "airflow"},
    schedule_interval=None,
    start_date=days_ago(0),
    catchup=False,
    tags=["custom_example", "TaskFlow"],
)
def taskflow_previous_task():
    @provide_session
    def _get_upstream_task(upstream_task_id, dag, execution_date, session=None, **_):
        upstream_ti = (
            session.query(TaskInstance)
            .filter(
                TaskInstance.dag_id == dag.dag_id,
                TaskInstance.execution_date == execution_date,
                TaskInstance.task_id == upstream_task_id,
            )
            .first()
        )
        return upstream_ti

    @task
    def job_submission_task(**context):
        print(f"Task Id: {context['ti'].task_id}")
        return {"job_data": "something"}

    @task(trigger_rule='all_done')
    def update_job_status(job_data, **context):
        print(f"Data from previous Task: {job_data['job_data']}")
        upstream_ti = _get_upstream_task("job_submission_task", **context)

        print(f"Upstream_ti state: {upstream_ti.state}")
        return upstream_ti.state

    job_results = job_submission_task()
    job_status = update_job_status(job_results)

job_submission_task returns a dict 使用 XcomArg 通过 Xcoms 传递给 update_job_status,这是 Taskflow API。通过这样做,您可以避免显式执行 xcom_pull()xcom_push() 操作。

_get_upstream_task 方法获得 TaskInstance 对象后,您可以 return 它并从将执行 HTTP 请求的最后一个任务中再次检索它:

最终任务,DAG 定义结束:

    task_post_op = SimpleHttpOperator(
        task_id="post_op",
        endpoint="post",
        data=json.dumps({"job_status": f"{job_status}"}),
        headers={"Content-Type": "application/json"},
        log_response=True,
    )
    job_status >> task_post_op


example_dag = taskflow_previous_task()

由于 SimpleHttpOperator 的参数 data 是模板化的,您可以使用 Jinja 从第二个任务中检索 Xcom 值: data=json.dumps({"job_status": f"{job_status}"}),

执行日志:

Task_1:

AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:17,148] {logging_mixin.py:104} INFO - Task Id: job_submission_task
[2021-08-20 23:15:17,148] {python.py:151} INFO - Done. Returned value was: {'job_data': 'something'}
[2021-08-20 23:15:17,202] {taskinstance.py:1211} INFO - Marking task as SUCCESS. 

Task_2:

AIRFLOW_CTX_DAG_ID=taskflow_previous_task
AIRFLOW_CTX_TASK_ID=update_job_status
AIRFLOW_CTX_EXECUTION_DATE=2021-08-20T23:15:15.226853+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:18,768] {logging_mixin.py:104} INFO - Data from previous Task: something
[2021-08-20 23:15:18,792] {logging_mixin.py:104} INFO - Upstream_ti state: success
[2021-08-20 23:15:18,793] {python.py:151} INFO - Done. Returned value was: success
[2021-08-20 23:15:18,874] {taskinstance.py:1211} INFO - Marking task as SUCCESS.

Task_3:

AIRFLOW_CTX_DAG_ID=taskflow_previous_task
AIRFLOW_CTX_TASK_ID=post_op
AIRFLOW_CTX_EXECUTION_DATE=2021-08-20T23:15:15.226853+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:21,201] {http.py:111} INFO - Calling HTTP method
[2021-08-20 23:15:21,228] {base.py:78} INFO - Using connection to: id: http_default. Host: https://www.httpbin.org, Port: None, Schema: , Login: , Password: None, extra: {}
[2021-08-20 23:15:21,245] {http.py:140} INFO - Sending 'POST' to url: https://www.httpbin.org/post
[2021-08-20 23:15:21,973] {http.py:115} INFO - {
  "args": {}, 
  "data": "{\"job_status\": \"success\"}", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "25", 
    "Content-Type": "application/json", 
    "Host": "www.httpbin.org", 
    "User-Agent": "python-requests/2.25.1", 
    "X-Amzn-Trace-Id": "Root=1-61203789-0136b7557ba4e0116bb5e16d"
  }, 
  "json": {
    "job_status": "success"
  }, 
  "origin": "200.73.153.254", 
  "url": "https://www.httpbin.org/post"
}

[2021-08-20 23:15:22,027] {taskinstance.py:1211} INFO - Marking task as SUCCESS.

让我知道这是否对您有用,我尝试使用尽可能多的 Taskflow 功能。

来源:Docs1 Docs2

编辑:

  • 已将 trigger_rule='all_done' 添加到 update_job_status 任务。