Airflow:如何将装饰任务中的数据传递给 SimpleHttpOperator?

Airflow: How to pass data from a decorated task to SimpleHttpOperator?

我最近开始使用 Apache airflow。我正在将 Taskflow API 与一项 ID 为 Get_payloadSimpleHttpOperator 的修饰任务一起使用。任务 Get_payload 从数据库获取数据,进行一些数据操作并 returns 一个 dict 作为有效负载。

问题

无法将数据从上一个任务传递到下一个任务。是的,我知道 XComs,但使用 Taskflow API 的全部目的是避免与 XComs 直接交互。当 get_data 直接传递给 SimpleHttpOperatordata 属性 时出现以下错误。

airflow.exceptions.AirflowException: 400:BAD REQUEST

到目前为止我尝试了什么?

中所述,我在我的自定义传感器中使用了 template_field 来定义预期来自上一个任务的数据的字段。在 SimpleHttpOperator 运算符的情况下,我无法对其进行编辑以执行相同的操作。那么在SimpleHttpOperator中如何解决呢?

我也检查了this SO answer and this

DAG:

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task(multiple_outputs=True)
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return data

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()

完整日志:

[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:1094} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,971] {taskinstance.py:1095} INFO - Starting attempt 1 of 1
[2021-08-28 20:28:12,971] {taskinstance.py:1096} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,982] {taskinstance.py:1114} INFO - Executing <Task(SimpleHttpOperator): clf_api> on 2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:12,987] {standard_task_runner.py:52} INFO - Started process 19229 to run task
[2021-08-28 20:28:12,991] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'http_operator', 'clf_api', '2021-08-28T20:28:10.265689+00:00', '--job-id', '71', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/Http_Operator.py', '--cfg-path', '/tmp/tmp4l9hwi4q', '--error-file', '/tmp/tmpk1yrhtki']
[2021-08-28 20:28:12,993] {standard_task_runner.py:77} INFO - Job 71: Subtask clf_api
[2021-08-28 20:28:13,048] {logging_mixin.py:109} INFO - Running <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [running]> on host d332abee08c8
[2021-08-28 20:28:13,126] {taskinstance.py:1251} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=http_operator
AIRFLOW_CTX_TASK_ID=clf_api
AIRFLOW_CTX_EXECUTION_DATE=2021-08-28T20:28:10.265689+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:13,128] {http.py:111} INFO - Calling HTTP method
[2021-08-28 20:28:13,141] {base.py:70} INFO - Using connection to: id: ML_API. Host: <IP-REMOVED>, Port: None, Schema: , Login: dexter, Password: ***, extra: {}
[2021-08-28 20:28:13,144] {http.py:140} INFO - Sending 'POST' to url: http://<IP-REMOVED>/classify
[2021-08-28 20:28:13,841] {http.py:154} ERROR - HTTP error: BAD REQUEST
[2021-08-28 20:28:13,842] {http.py:155} ERROR - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>400 Bad Request</title>
<h1>Bad Request</h1>
<p>Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)</p>

[2021-08-28 20:28:13,874] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 152, in check_response
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.8/site-packages/requests/models.py", line 953, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: BAD REQUEST for url: http://<IP-REMOVED>/classify

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/operators/http.py", line 113, in execute
    response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 141, in run
    return self.run_and_check(session, prepped_request, extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 198, in run_and_check
    self.check_response(response)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 156, in check_response
    raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 400:BAD REQUEST
[2021-08-28 20:28:13,882] {taskinstance.py:1505} INFO - Marking task as FAILED. dag_id=http_operator, task_id=clf_api, execution_date=20210828T202810, start_date=20210828T202812, end_date=20210828T202813
[2021-08-28 20:28:13,969] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-08-28 20:28:14,043] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

正如@Josh Fell 在评论中所建议的那样,我的 DAG 中有两个错误。

  1. 在从 Get_payload 返回之前将 data 包装在 json.dumps(data) 中。
  2. Get_payload 的任务装饰器中删除 multiple_outputs=True

最终代码:

import json

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task()
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return json.dumps(data)

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()