Airflow 2.0.2 如何使用 xcom 在 postgres 任务中传递参数?

Airflow 2.0.2 How to pass parameter within postgres tasks using xcom?

我正在尝试以动态方式在 postgres 运算符中传递参数。

刷新元数据有两个任务,

  1. 获取 id 列表 (get_query_id_task)

  2. 传递 ID 列表以获取并执行查询 (get_query_text_task)

    get_query_id_task = PythonOperator(
         task_id='get_query_id',
         python_callable=query_and_push,
         #provide_context=True,
         op_kwargs={
             'sql' : read_sql('warmupqueryid.sql')
                             }
                         )
    
    
     get_query_text_task= PostgresOperator(
         task_id='get_query_text',
         postgres_conn_id='redshift',
         trigger_rule=TriggerRule.ALL_DONE,
         params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"},
         sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
                FROM stl_querytext
                WHERE query in {{ macros.custom_macros.render_list_sql(params.query_ids) }};""",
         )
    

Xcom推送return查询列表如下:

[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]

我已经使用插件来呈现 xcom 推送:

def render_list_sql(li):
l = []
for index, tup in enumerate(li):  
    idd = tup[0]
    l.append(idd)
return tuple(l)    
    

# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "custom_macros"
    macros = [render_list_sql]

第一个任务的模板渲染:

渲染的模板没有传递参数

Xcom推送值,是元组列表

问题已使用下面提供的解决方案解决。但是我无法对 ID 列表进行循环。所以它只是给我看一个ID。而且我不确定如何对 ID 进行循环。

这是日志:

*** Reading remote log from s3://ob-airflow-pre/logs/Redshift_warm-up/get_query_text/2021-05-27T20:31:05.036338+00:00/1.log.
[2021-05-27 20:31:07,435] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,463] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-27 20:31:07,463] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,473] {taskinstance.py:1089} INFO - Executing <Task(PostgresOperator): get_query_text> on 2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,476] {standard_task_runner.py:52} INFO - Started process 384 to run task
[2021-05-27 20:31:07,479] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'Redshift_warm-up', 'get_query_text', '2021-05-27T20:31:05.036338+00:00', '--job-id', '2045', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/redshift_warm-up_dag.py', '--cfg-path', '/tmp/tmp8n32exly', '--error-file', '/tmp/tmp0bdhn3lj']
[2021-05-27 20:31:07,479] {standard_task_runner.py:77} INFO - Job 2045: Subtask get_query_text
[2021-05-27 20:31:07,645] {logging_mixin.py:104} INFO - Running <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-27 20:31:07,747] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=Redshift_warm-up
AIRFLOW_CTX_TASK_ID=get_query_text
AIRFLOW_CTX_EXECUTION_DATE=2021-05-27T20:31:05.036338+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,748] {postgres.py:69} INFO - Executing: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in (19343160);
[2021-05-27 20:31:07,767] {base.py:69} INFO - Using connection to: id: redshift. Host: sys-redshift-pre.oneboxtickets.net, Port: 5439, Schema: reports, Login: mstr_new, Password: XXXXXXXX, extra: XXXXXXXX
[2021-05-27 20:31:07,792] {dbapi.py:180} INFO - Running statement: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in (19343160);, parameters: None
[2021-05-27 20:31:08,727] {dbapi.py:186} INFO - Rows affected: 1
[2021-05-27 20:31:08,759] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=Redshift_warm-up, task_id=get_query_text, execution_date=20210527T203105, start_date=20210527T203107, end_date=20210527T203108
[2021-05-27 20:31:08,806] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-05-27 20:31:08,814] {local_task_job.py:146} INFO - Task exited with return code 0

params 参数不是“模板化”的,因此它只会呈现字符串。所以将你的 param 直接移动到 SQL

    get_query_text_task= PostgresOperator(
        task_id='get_query_text',
        postgres_conn_id='redshift',
        trigger_rule=TriggerRule.ALL_DONE,
        sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in ({{ macros.custom_macros.render_list_sql( ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }});""",
        )