如何将先前的任务状态作为参数传递给 Airflow Taskflow API 中的另一个任务?
How to pass previous task state as parameter to another task within the Airflow Taskflow API?
我想获取 SparkSubmitOperator 的状态,将其转换为我的 API 理解的某个值,并将其传递到 SimpleHttpOperator 的有效负载中,以便我可以更新我的数据库中的作业状态。我想通过使用 Taskflow API.
来做到这一点
我写了下面的代码,但是当我尝试加载它时出现这个错误:
Broken DAG: [/opt/airflow/dags/export/inapp_clicks/export.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1378, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1316, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'function' object has no attribute 'update_relative'
代码:
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
@dag(schedule_interval=None, start_date=datetime.now(), tags=["export", "inapp"])
def export_inapp_clicks():
DEFAULT_NUM_EXECUTORS = 2
DEFAULT_EXECUTOR_CORES = 3
DEFAULT_EXECUTOR_MEMORY = "2g"
DEFAULT_DRIVER_MEMORY = "1g"
@task()
def update_job_status(dag, ti, execution_date):
jst = dag.get_task("export_inapp_clicks_job_submission")
jsti = TaskInstance(jst, execution_date)
xcom_value = ti.xcom_pull(task_ids="export_inapp_clicks_job_submission")
print("Task:", jst)
print("Task Instance:", jsti)
print("Task State:", jsti.current_state())
print("XCOM Value:", xcom_value)
# TODO: call API via SimpleHttpOperator
job_submission = SparkSubmitOperator(
task_id="export_inapp_clicks_job_submission",
conn_id="yarn",
name="{{ dag_run.conf['name'] }}",
conf=Variable.get("export_inapp_clicks_conf", deserialize_json=True),
jars=Variable.get("export_inapp_clicks_jars"),
application=Variable.get("pyspark_executor_path"),
application_args=[
"--module",
"export_inapp_clicks.export",
"--org-id",
"{{ dag_run.conf['orgId'] }}",
"--app-id",
"{{ dag_run.conf['appId'] }}",
"--inapp-id",
"{{ dag_run.conf['inappId'] }}",
"--start-date",
"{{ dag_run.conf['startDate'] }}",
"--end-date",
"{{ dag_run.conf['endDate'] }}",
"--data-path",
Variable.get("event_data_path"),
"--es-nodes",
Variable.get("es_nodes"),
"--destination",
Variable.get("export_inapp_clicks_output"),
"--explain",
"--debug",
"--encode-columns",
"--log-level",
"WARN"
],
py_files=Variable.get("export_inapp_clicks_py_files"),
num_executors=Variable.get("export_inapp_clicks_num_executors", DEFAULT_NUM_EXECUTORS),
executor_cores=Variable.get("export_inapp_clicks_executor_cores", DEFAULT_EXECUTOR_CORES),
executor_memory=Variable.get("export_inapp_clicks_executor_memory", DEFAULT_EXECUTOR_MEMORY),
driver_memory=Variable.get("export_inapp_clicks_driver_memory", DEFAULT_DRIVER_MEMORY),
status_poll_interval=10
)
job_submission >> update_job_status
export_dag = export_inapp_clicks()
考虑以下示例,第一个任务将对应于您的 SparkSubmitOperator
任务:
_get_upstream_task
通过对元数据库执行查询,负责从第二个任务获取第一个任务的状态:
DAG 定义,前两个任务:
import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.models.taskinstance import TaskInstance
from airflow.providers.http.operators.http import SimpleHttpOperator
@dag(
default_args= {"owner": "airflow"},
schedule_interval=None,
start_date=days_ago(0),
catchup=False,
tags=["custom_example", "TaskFlow"],
)
def taskflow_previous_task():
@provide_session
def _get_upstream_task(upstream_task_id, dag, execution_date, session=None, **_):
upstream_ti = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id == upstream_task_id,
)
.first()
)
return upstream_ti
@task
def job_submission_task(**context):
print(f"Task Id: {context['ti'].task_id}")
return {"job_data": "something"}
@task(trigger_rule='all_done')
def update_job_status(job_data, **context):
print(f"Data from previous Task: {job_data['job_data']}")
upstream_ti = _get_upstream_task("job_submission_task", **context)
print(f"Upstream_ti state: {upstream_ti.state}")
return upstream_ti.state
job_results = job_submission_task()
job_status = update_job_status(job_results)
job_submission_task
returns a dict
使用 XcomArg 通过 Xcoms
传递给 update_job_status
,这是 Taskflow API。通过这样做,您可以避免显式执行 xcom_pull()
和 xcom_push()
操作。
从 _get_upstream_task
方法获得 TaskInstance
对象后,您可以 return 它并从将执行 HTTP 请求的最后一个任务中再次检索它:
最终任务,DAG 定义结束:
task_post_op = SimpleHttpOperator(
task_id="post_op",
endpoint="post",
data=json.dumps({"job_status": f"{job_status}"}),
headers={"Content-Type": "application/json"},
log_response=True,
)
job_status >> task_post_op
example_dag = taskflow_previous_task()
由于 SimpleHttpOperator
的参数 data
是模板化的,您可以使用 Jinja
从第二个任务中检索 Xcom 值:
data=json.dumps({"job_status": f"{job_status}"}),
执行日志:
Task_1:
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:17,148] {logging_mixin.py:104} INFO - Task Id: job_submission_task
[2021-08-20 23:15:17,148] {python.py:151} INFO - Done. Returned value was: {'job_data': 'something'}
[2021-08-20 23:15:17,202] {taskinstance.py:1211} INFO - Marking task as SUCCESS.
Task_2:
AIRFLOW_CTX_DAG_ID=taskflow_previous_task
AIRFLOW_CTX_TASK_ID=update_job_status
AIRFLOW_CTX_EXECUTION_DATE=2021-08-20T23:15:15.226853+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:18,768] {logging_mixin.py:104} INFO - Data from previous Task: something
[2021-08-20 23:15:18,792] {logging_mixin.py:104} INFO - Upstream_ti state: success
[2021-08-20 23:15:18,793] {python.py:151} INFO - Done. Returned value was: success
[2021-08-20 23:15:18,874] {taskinstance.py:1211} INFO - Marking task as SUCCESS.
Task_3:
AIRFLOW_CTX_DAG_ID=taskflow_previous_task
AIRFLOW_CTX_TASK_ID=post_op
AIRFLOW_CTX_EXECUTION_DATE=2021-08-20T23:15:15.226853+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:21,201] {http.py:111} INFO - Calling HTTP method
[2021-08-20 23:15:21,228] {base.py:78} INFO - Using connection to: id: http_default. Host: https://www.httpbin.org, Port: None, Schema: , Login: , Password: None, extra: {}
[2021-08-20 23:15:21,245] {http.py:140} INFO - Sending 'POST' to url: https://www.httpbin.org/post
[2021-08-20 23:15:21,973] {http.py:115} INFO - {
"args": {},
"data": "{\"job_status\": \"success\"}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "25",
"Content-Type": "application/json",
"Host": "www.httpbin.org",
"User-Agent": "python-requests/2.25.1",
"X-Amzn-Trace-Id": "Root=1-61203789-0136b7557ba4e0116bb5e16d"
},
"json": {
"job_status": "success"
},
"origin": "200.73.153.254",
"url": "https://www.httpbin.org/post"
}
[2021-08-20 23:15:22,027] {taskinstance.py:1211} INFO - Marking task as SUCCESS.
让我知道这是否对您有用,我尝试使用尽可能多的 Taskflow 功能。
编辑:
- 已将
trigger_rule='all_done'
添加到 update_job_status
任务。
我想获取 SparkSubmitOperator 的状态,将其转换为我的 API 理解的某个值,并将其传递到 SimpleHttpOperator 的有效负载中,以便我可以更新我的数据库中的作业状态。我想通过使用 Taskflow API.
来做到这一点我写了下面的代码,但是当我尝试加载它时出现这个错误:
Broken DAG: [/opt/airflow/dags/export/inapp_clicks/export.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1378, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1316, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'function' object has no attribute 'update_relative'
代码:
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
@dag(schedule_interval=None, start_date=datetime.now(), tags=["export", "inapp"])
def export_inapp_clicks():
DEFAULT_NUM_EXECUTORS = 2
DEFAULT_EXECUTOR_CORES = 3
DEFAULT_EXECUTOR_MEMORY = "2g"
DEFAULT_DRIVER_MEMORY = "1g"
@task()
def update_job_status(dag, ti, execution_date):
jst = dag.get_task("export_inapp_clicks_job_submission")
jsti = TaskInstance(jst, execution_date)
xcom_value = ti.xcom_pull(task_ids="export_inapp_clicks_job_submission")
print("Task:", jst)
print("Task Instance:", jsti)
print("Task State:", jsti.current_state())
print("XCOM Value:", xcom_value)
# TODO: call API via SimpleHttpOperator
job_submission = SparkSubmitOperator(
task_id="export_inapp_clicks_job_submission",
conn_id="yarn",
name="{{ dag_run.conf['name'] }}",
conf=Variable.get("export_inapp_clicks_conf", deserialize_json=True),
jars=Variable.get("export_inapp_clicks_jars"),
application=Variable.get("pyspark_executor_path"),
application_args=[
"--module",
"export_inapp_clicks.export",
"--org-id",
"{{ dag_run.conf['orgId'] }}",
"--app-id",
"{{ dag_run.conf['appId'] }}",
"--inapp-id",
"{{ dag_run.conf['inappId'] }}",
"--start-date",
"{{ dag_run.conf['startDate'] }}",
"--end-date",
"{{ dag_run.conf['endDate'] }}",
"--data-path",
Variable.get("event_data_path"),
"--es-nodes",
Variable.get("es_nodes"),
"--destination",
Variable.get("export_inapp_clicks_output"),
"--explain",
"--debug",
"--encode-columns",
"--log-level",
"WARN"
],
py_files=Variable.get("export_inapp_clicks_py_files"),
num_executors=Variable.get("export_inapp_clicks_num_executors", DEFAULT_NUM_EXECUTORS),
executor_cores=Variable.get("export_inapp_clicks_executor_cores", DEFAULT_EXECUTOR_CORES),
executor_memory=Variable.get("export_inapp_clicks_executor_memory", DEFAULT_EXECUTOR_MEMORY),
driver_memory=Variable.get("export_inapp_clicks_driver_memory", DEFAULT_DRIVER_MEMORY),
status_poll_interval=10
)
job_submission >> update_job_status
export_dag = export_inapp_clicks()
考虑以下示例,第一个任务将对应于您的 SparkSubmitOperator
任务:
_get_upstream_task
通过对元数据库执行查询,负责从第二个任务获取第一个任务的状态:
DAG 定义,前两个任务:
import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.models.taskinstance import TaskInstance
from airflow.providers.http.operators.http import SimpleHttpOperator
@dag(
default_args= {"owner": "airflow"},
schedule_interval=None,
start_date=days_ago(0),
catchup=False,
tags=["custom_example", "TaskFlow"],
)
def taskflow_previous_task():
@provide_session
def _get_upstream_task(upstream_task_id, dag, execution_date, session=None, **_):
upstream_ti = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id == upstream_task_id,
)
.first()
)
return upstream_ti
@task
def job_submission_task(**context):
print(f"Task Id: {context['ti'].task_id}")
return {"job_data": "something"}
@task(trigger_rule='all_done')
def update_job_status(job_data, **context):
print(f"Data from previous Task: {job_data['job_data']}")
upstream_ti = _get_upstream_task("job_submission_task", **context)
print(f"Upstream_ti state: {upstream_ti.state}")
return upstream_ti.state
job_results = job_submission_task()
job_status = update_job_status(job_results)
job_submission_task
returns a dict
使用 XcomArg 通过 Xcoms
传递给 update_job_status
,这是 Taskflow API。通过这样做,您可以避免显式执行 xcom_pull()
和 xcom_push()
操作。
从 _get_upstream_task
方法获得 TaskInstance
对象后,您可以 return 它并从将执行 HTTP 请求的最后一个任务中再次检索它:
最终任务,DAG 定义结束:
task_post_op = SimpleHttpOperator(
task_id="post_op",
endpoint="post",
data=json.dumps({"job_status": f"{job_status}"}),
headers={"Content-Type": "application/json"},
log_response=True,
)
job_status >> task_post_op
example_dag = taskflow_previous_task()
由于 SimpleHttpOperator
的参数 data
是模板化的,您可以使用 Jinja
从第二个任务中检索 Xcom 值:
data=json.dumps({"job_status": f"{job_status}"}),
执行日志:
Task_1:
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:17,148] {logging_mixin.py:104} INFO - Task Id: job_submission_task
[2021-08-20 23:15:17,148] {python.py:151} INFO - Done. Returned value was: {'job_data': 'something'}
[2021-08-20 23:15:17,202] {taskinstance.py:1211} INFO - Marking task as SUCCESS.
Task_2:
AIRFLOW_CTX_DAG_ID=taskflow_previous_task
AIRFLOW_CTX_TASK_ID=update_job_status
AIRFLOW_CTX_EXECUTION_DATE=2021-08-20T23:15:15.226853+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:18,768] {logging_mixin.py:104} INFO - Data from previous Task: something
[2021-08-20 23:15:18,792] {logging_mixin.py:104} INFO - Upstream_ti state: success
[2021-08-20 23:15:18,793] {python.py:151} INFO - Done. Returned value was: success
[2021-08-20 23:15:18,874] {taskinstance.py:1211} INFO - Marking task as SUCCESS.
Task_3:
AIRFLOW_CTX_DAG_ID=taskflow_previous_task
AIRFLOW_CTX_TASK_ID=post_op
AIRFLOW_CTX_EXECUTION_DATE=2021-08-20T23:15:15.226853+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-20T23:15:15.226853+00:00
[2021-08-20 23:15:21,201] {http.py:111} INFO - Calling HTTP method
[2021-08-20 23:15:21,228] {base.py:78} INFO - Using connection to: id: http_default. Host: https://www.httpbin.org, Port: None, Schema: , Login: , Password: None, extra: {}
[2021-08-20 23:15:21,245] {http.py:140} INFO - Sending 'POST' to url: https://www.httpbin.org/post
[2021-08-20 23:15:21,973] {http.py:115} INFO - {
"args": {},
"data": "{\"job_status\": \"success\"}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "25",
"Content-Type": "application/json",
"Host": "www.httpbin.org",
"User-Agent": "python-requests/2.25.1",
"X-Amzn-Trace-Id": "Root=1-61203789-0136b7557ba4e0116bb5e16d"
},
"json": {
"job_status": "success"
},
"origin": "200.73.153.254",
"url": "https://www.httpbin.org/post"
}
[2021-08-20 23:15:22,027] {taskinstance.py:1211} INFO - Marking task as SUCCESS.
让我知道这是否对您有用,我尝试使用尽可能多的 Taskflow 功能。
编辑:
- 已将
trigger_rule='all_done'
添加到update_job_status
任务。