Airflow DAG - 使用 SimpleHttpOperator 访问上下文以启用 XCOM 拉取

Airflow DAG - access context using SimpleHttpOperator to enable XCOM pull

我正在努力使用 SimpleHttpOperator 将 xcoms 拉入任务。

下面的 dag 旨在编排向第三方 API 发出的一些请求(通过 Google 云功能),将 csvs 存储在存储中,并最终访问所有 csvs,合并在一起,转换并存储在大查询中。

任务成对进行,对于每个报告,第一个任务触发 Cloud 函数生成请求并将报告令牌存储在 Secret Manager 中,第二个任务检查报告是否可供下载,重试直到成功,然后将其保存到 Google Cloud Storage。

一旦所有 CSV 都可用,最后一个任务将触发另一个云功能,从存储中下载所有 csv,合并并上传到 BQ。

当每个单独的下载完成后,我使用 SimpleHttpOperator 的 response_filter arg 使文件名可供以后用作 xcom。

# Python standard modules
from datetime import datetime, timedelta# Airflow modules
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import json

default_args = {
    'owner': '--',
    'depends_on_past': False,
    # Start on 27th of June, 2020
    'start_date': datetime(2021, 6, 16),
    'email': ['--'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(seconds=60),
    'provide_context': True    
}

with DAG(
    "dailymotion_reporting",
    default_args=default_args,
    schedule_interval='0 6 * * *',
    tags=["my_dags"]
) as dag:    
    
    

    def push_xcom(**context):
        v = context['ti'].xcom_push(key="filename", value=response.json()["filename"])
        return v

    def response_check(response):
        if response[2] == "report not ready":
            print("report not ready: " + report_summary)
            return False
        elif response[2] == "report downloaded":
            print("report downloaded: " + report_summary)
            return True

    #t1 as request first report
    report1_request = SimpleHttpOperator(
        task_id= "report1_request",
        method='POST',
        http_conn_id='report_request_trigger',
        endpoint='request_dm_report',
        data=json.dumps({
                "dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION", 
                "metrics": "TOTAL_INVENTORY", 
                "product": "EMBED"
        }),
        headers={"Content-Type": "application/json"}
    )
    #t2 check report availabilty until available then download
    report1_check_dl = SimpleHttpOperator(
        task_id= "report1_check_dl",
        method='GET',
        http_conn_id='report_request_trigger',
        endpoint='check_previously_requested_dm_reports',
        response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False,
        response_filter = lambda response: {"filename": response.json()["filename"]}
    )

将 csvs 从存储中拉出的任务如下。我正在尝试从之前的任务生成的 xcom 中检索文件名,并将它们包含在我的云功能的数据有效负载中。

ad_report_transformations = SimpleHttpOperator(
    task_id= "ad_report_transformations",
    method='POST',
    http_conn_id='report_request_trigger',
    endpoint='dm_transform_ad_data',
    data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "),
    response_check = lambda response: True if response == "ok" else False
)

然而,在尝试了许多不同的方法后,我不断收到相同错误的变体

{taskinstance.py:1152} ERROR - 'context' is undefined

我使用 SimpleHttpOperator 定义上下文的最佳方式是什么?还是有另一种方法来引入这些值?我见过的大多数类似问题的解决方案都使用 pythonOperator,它有一个 provide_context arg,它似乎启用了上述功能,但我想看看是否有办法让我做到这一点而不必重写我所有的任务作为功能。谢谢

为什么 json.dumps(

我相信如果你简单地传递 data = " {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} ",json.dumps 额外编码字符串(例如它使用 \'),我认为这会很好地工作,我认为这与 jinja 模板混淆。

在 Jinja 模板中检索 XCom 时,您不需要 context 对象,因为传递了上下文对象以在幕后呈现模板值。试试这个:

data="{{ ti.xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }}"

以上假设您实际上不需要json.dumps()