airflow BigQueryOperator ERROR - 400 Syntax error: Unexpected token at [1:244] - while using params

airflow BigQueryOperator ERROR - 400 Syntax error: Unexpected token at [1:244] - while using params

我在一个循环中有 2 个 BigQueryOperator 任务。第一个任务完美运行,但是第二个任务 (create_partition_table_agent_intensity_{v_list[i]}) 抛出错误:

ERROR - 400 Syntax error: Unexpected "{" at [1:244]

我不明白任务之间有什么区别。 也许有人可以指出我正确的方向?

这是我的全部代码:

from airflow.models import (DAG, Variable)
import os

from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

import datetime
import json
import pandas as pd

from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

from google.cloud import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook


from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator



default_args = {
    'start_date': datetime.datetime(2020, 1, 1),
}


PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
PROJECT_ID_GCP = os.environ.get("GCP_PROJECT_ID", "my_project")
DATASET_MRR = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_RAW")
DATASET_STG = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_STG")



MRR_AGENT_ACTIVITY = "RPT_FA_AGENT_ACTIVITY_VW"
MRR_AGENT_INTENSITY = "RPT_AGG_15M_MSG_AGENT_INTENSITY_VW"
STG_AGENT_ACTIVITY_PARTITIONED = "agent_acitivity_partitioned"
STG_AGENT_INTENSITY_PARTITIONED = "agent_intensity_partitioned"



def list_dates_in_df(ti):

        hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
                         use_legacy_sql=False)
        bq_client = bigquery.Client(project = hook._get_field("project"),
                                 credentials = hook._get_credentials())
        query = "select distinct(cast(PARTITION_KEY as string)) as PARTITION_KEY \
        FROM LP_MNG.PartitionStatusMonitoring\
        where SOURCE_TABLE in ('RPT_FA_AGENT_ACTIVITY_VW','RPT_AGG_15M_MSG_AGENT_INTENSITY_VW')\
        and IS_LOAD_COMPLETED = false;"
        df = bq_client.query(query).to_dataframe()
        res = df.values.tolist()
        #unpack the list of lists, l is a list inside res list, take item from res, now each item is l
        my_list = [item for l in res for item in l]
        ti.xcom_push(key = 'list_of_dates', value = my_list)


def update_variable(ti):
        updated_file_list = ti.xcom_pull(key = 'list_of_dates',task_ids='list_dates')
        Variable.set(key="updated_dates", value=json.dumps(updated_file_list))
        print(updated_file_list)
        print(type(updated_file_list))
    

with DAG(
        'test_with_mng_table_list',                     
        schedule_interval=None,                 
        catchup = False,                 
        default_args=default_args
        ) as dag:
    
        

        list_dates = PythonOperator(
                task_id ='list_dates',
                python_callable = list_dates_in_df
                )


        set_list = PythonOperator(
                             task_id= 'set_list',
                             python_callable=update_variable
                            )


        v_list = Variable.get("updated_dates", deserialize_json=True)
        


        end_job = BashOperator(
        task_id='end_job',
        bash_command='echo end_job.',
        trigger_rule = 'all_done', )


for i in range(len(v_list)):
    create_partition_table_agent_activity = BigQueryOperator(
    task_id=f"create_partition_table_agent_activity_{v_list[i]}",
    sql="select ACCOUNT_ID,timestamp_trunc(CHANGE_EVENT_TIME_15M,HOUR) as ANALYSIS_DATE,\
        AGENT_ID,AGENT_GROUP_ID,USER_TYPE_ID,\
        sum(AWAY_ENGAGED_TIME) AWAY_ENGAGED_TIME,sum(BACKIN5_ENGAGED_TIME) BACKIN5_ENGAGED_TIME,\
        sum(DURATION_DAYS) DURATION_DAYS,sum(ONLINE_TIME) ONLINE_TIME,\
        sum(BACK_IN_5_TIME) BACK_IN_5_TIME,sum(AWAY_TIME) AWAY_TIME\
        from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR1 }}\
        where cast(CHANGE_EVENT_TIME_15M as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING)  \
        group by 1,2,3,4,5;",
    params={"PROJECT_ID":PROJECT_ID_GCP ,
           "DATASET_MRR":DATASET_MRR,
            "MRR1":MRR_AGENT_ACTIVITY,
            "date_a" : v_list[i]
            },
    destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_ACTIVITY_PARTITIONED}{v_list[i]}",
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    #bigquery_conn_id=CONNECTION_ID,
    use_legacy_sql=False,
    dag=dag
    )


    create_partition_table_agent_intensity = BigQueryOperator(
    task_id=f"create_partition_table_agent_intensity_{v_list[i]}",
    sql=f"select ACCOUNT_ID,timestamp_trunc(AGG_DATE,HOUR) as ANALYSIS_DATE,\
        AGENT_ID, GROUP_ID as AGENT_GROUP_ID,\
        USER_TYPE_ID, SUM(SUM_CONVERSATION_LOAD_RATE) as SUM_CONVERSATION_LOAD_RATE,\
        SUM(NO_EVENTS) AS NO_EVENTS\
        from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR2 }}\
        where cast(AGG_DATE as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING)  \
        group by 1,2,3,4,5;",
    params={"PROJECT_ID":PROJECT_ID_GCP ,
           "DATASET_MRR":DATASET_MRR,
            "MRR2":MRR_AGENT_INTENSITY,
            "date_a" : v_list[i]
            },
    destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_INTENSITY_PARTITIONED}{v_list[i]}",
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    #bigquery_conn_id=CONNECTION_ID,
    use_legacy_sql=False,
    dag=dag
    )

    d2 = DummyOperator(task_id='generate_data_{0}'.format(v_list[i]),dag=dag)

    list_dates >> set_list  >> [
    create_partition_table_agent_activity,create_partition_table_agent_intensity
    ] >> d2 >> end_job


我没有 playground 来测试它,但我认为你不应该为 sql 参数使用 f-string。如果您在 f-string 中使用 {{something}} 它 returns string {something} 因此不会插入查询参数,这会导致 SQL 语法错误,因为查询是 运行没有参数。请尝试在第二个任务中删除 sql 字符串前的 f