airflow BigQueryOperator ERROR - 400 Syntax error: Unexpected token at [1:244] - while using params
airflow BigQueryOperator ERROR - 400 Syntax error: Unexpected token at [1:244] - while using params
我在一个循环中有 2 个 BigQueryOperator 任务。第一个任务完美运行,但是第二个任务 (create_partition_table_agent_intensity_{v_list[i]}
) 抛出错误:
ERROR - 400 Syntax error: Unexpected "{" at [1:244]
我不明白任务之间有什么区别。
也许有人可以指出我正确的方向?
这是我的全部代码:
from airflow.models import (DAG, Variable)
import os
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import json
import pandas as pd
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from google.cloud import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
default_args = {
'start_date': datetime.datetime(2020, 1, 1),
}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
PROJECT_ID_GCP = os.environ.get("GCP_PROJECT_ID", "my_project")
DATASET_MRR = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_RAW")
DATASET_STG = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_STG")
MRR_AGENT_ACTIVITY = "RPT_FA_AGENT_ACTIVITY_VW"
MRR_AGENT_INTENSITY = "RPT_AGG_15M_MSG_AGENT_INTENSITY_VW"
STG_AGENT_ACTIVITY_PARTITIONED = "agent_acitivity_partitioned"
STG_AGENT_INTENSITY_PARTITIONED = "agent_intensity_partitioned"
def list_dates_in_df(ti):
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
use_legacy_sql=False)
bq_client = bigquery.Client(project = hook._get_field("project"),
credentials = hook._get_credentials())
query = "select distinct(cast(PARTITION_KEY as string)) as PARTITION_KEY \
FROM LP_MNG.PartitionStatusMonitoring\
where SOURCE_TABLE in ('RPT_FA_AGENT_ACTIVITY_VW','RPT_AGG_15M_MSG_AGENT_INTENSITY_VW')\
and IS_LOAD_COMPLETED = false;"
df = bq_client.query(query).to_dataframe()
res = df.values.tolist()
#unpack the list of lists, l is a list inside res list, take item from res, now each item is l
my_list = [item for l in res for item in l]
ti.xcom_push(key = 'list_of_dates', value = my_list)
def update_variable(ti):
updated_file_list = ti.xcom_pull(key = 'list_of_dates',task_ids='list_dates')
Variable.set(key="updated_dates", value=json.dumps(updated_file_list))
print(updated_file_list)
print(type(updated_file_list))
with DAG(
'test_with_mng_table_list',
schedule_interval=None,
catchup = False,
default_args=default_args
) as dag:
list_dates = PythonOperator(
task_id ='list_dates',
python_callable = list_dates_in_df
)
set_list = PythonOperator(
task_id= 'set_list',
python_callable=update_variable
)
v_list = Variable.get("updated_dates", deserialize_json=True)
end_job = BashOperator(
task_id='end_job',
bash_command='echo end_job.',
trigger_rule = 'all_done', )
for i in range(len(v_list)):
create_partition_table_agent_activity = BigQueryOperator(
task_id=f"create_partition_table_agent_activity_{v_list[i]}",
sql="select ACCOUNT_ID,timestamp_trunc(CHANGE_EVENT_TIME_15M,HOUR) as ANALYSIS_DATE,\
AGENT_ID,AGENT_GROUP_ID,USER_TYPE_ID,\
sum(AWAY_ENGAGED_TIME) AWAY_ENGAGED_TIME,sum(BACKIN5_ENGAGED_TIME) BACKIN5_ENGAGED_TIME,\
sum(DURATION_DAYS) DURATION_DAYS,sum(ONLINE_TIME) ONLINE_TIME,\
sum(BACK_IN_5_TIME) BACK_IN_5_TIME,sum(AWAY_TIME) AWAY_TIME\
from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR1 }}\
where cast(CHANGE_EVENT_TIME_15M as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING) \
group by 1,2,3,4,5;",
params={"PROJECT_ID":PROJECT_ID_GCP ,
"DATASET_MRR":DATASET_MRR,
"MRR1":MRR_AGENT_ACTIVITY,
"date_a" : v_list[i]
},
destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_ACTIVITY_PARTITIONED}{v_list[i]}",
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
create_partition_table_agent_intensity = BigQueryOperator(
task_id=f"create_partition_table_agent_intensity_{v_list[i]}",
sql=f"select ACCOUNT_ID,timestamp_trunc(AGG_DATE,HOUR) as ANALYSIS_DATE,\
AGENT_ID, GROUP_ID as AGENT_GROUP_ID,\
USER_TYPE_ID, SUM(SUM_CONVERSATION_LOAD_RATE) as SUM_CONVERSATION_LOAD_RATE,\
SUM(NO_EVENTS) AS NO_EVENTS\
from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR2 }}\
where cast(AGG_DATE as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING) \
group by 1,2,3,4,5;",
params={"PROJECT_ID":PROJECT_ID_GCP ,
"DATASET_MRR":DATASET_MRR,
"MRR2":MRR_AGENT_INTENSITY,
"date_a" : v_list[i]
},
destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_INTENSITY_PARTITIONED}{v_list[i]}",
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
d2 = DummyOperator(task_id='generate_data_{0}'.format(v_list[i]),dag=dag)
list_dates >> set_list >> [
create_partition_table_agent_activity,create_partition_table_agent_intensity
] >> d2 >> end_job
我没有 playground 来测试它,但我认为你不应该为 sql 参数使用 f-string。如果您在 f-string 中使用 {{something}}
它 returns string {something}
因此不会插入查询参数,这会导致 SQL 语法错误,因为查询是 运行没有参数。请尝试在第二个任务中删除 sql
字符串前的 f
。
我在一个循环中有 2 个 BigQueryOperator 任务。第一个任务完美运行,但是第二个任务 (create_partition_table_agent_intensity_{v_list[i]}
) 抛出错误:
ERROR - 400 Syntax error: Unexpected "{" at [1:244]
我不明白任务之间有什么区别。 也许有人可以指出我正确的方向?
这是我的全部代码:
from airflow.models import (DAG, Variable)
import os
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import json
import pandas as pd
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from google.cloud import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
default_args = {
'start_date': datetime.datetime(2020, 1, 1),
}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
PROJECT_ID_GCP = os.environ.get("GCP_PROJECT_ID", "my_project")
DATASET_MRR = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_RAW")
DATASET_STG = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_STG")
MRR_AGENT_ACTIVITY = "RPT_FA_AGENT_ACTIVITY_VW"
MRR_AGENT_INTENSITY = "RPT_AGG_15M_MSG_AGENT_INTENSITY_VW"
STG_AGENT_ACTIVITY_PARTITIONED = "agent_acitivity_partitioned"
STG_AGENT_INTENSITY_PARTITIONED = "agent_intensity_partitioned"
def list_dates_in_df(ti):
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
use_legacy_sql=False)
bq_client = bigquery.Client(project = hook._get_field("project"),
credentials = hook._get_credentials())
query = "select distinct(cast(PARTITION_KEY as string)) as PARTITION_KEY \
FROM LP_MNG.PartitionStatusMonitoring\
where SOURCE_TABLE in ('RPT_FA_AGENT_ACTIVITY_VW','RPT_AGG_15M_MSG_AGENT_INTENSITY_VW')\
and IS_LOAD_COMPLETED = false;"
df = bq_client.query(query).to_dataframe()
res = df.values.tolist()
#unpack the list of lists, l is a list inside res list, take item from res, now each item is l
my_list = [item for l in res for item in l]
ti.xcom_push(key = 'list_of_dates', value = my_list)
def update_variable(ti):
updated_file_list = ti.xcom_pull(key = 'list_of_dates',task_ids='list_dates')
Variable.set(key="updated_dates", value=json.dumps(updated_file_list))
print(updated_file_list)
print(type(updated_file_list))
with DAG(
'test_with_mng_table_list',
schedule_interval=None,
catchup = False,
default_args=default_args
) as dag:
list_dates = PythonOperator(
task_id ='list_dates',
python_callable = list_dates_in_df
)
set_list = PythonOperator(
task_id= 'set_list',
python_callable=update_variable
)
v_list = Variable.get("updated_dates", deserialize_json=True)
end_job = BashOperator(
task_id='end_job',
bash_command='echo end_job.',
trigger_rule = 'all_done', )
for i in range(len(v_list)):
create_partition_table_agent_activity = BigQueryOperator(
task_id=f"create_partition_table_agent_activity_{v_list[i]}",
sql="select ACCOUNT_ID,timestamp_trunc(CHANGE_EVENT_TIME_15M,HOUR) as ANALYSIS_DATE,\
AGENT_ID,AGENT_GROUP_ID,USER_TYPE_ID,\
sum(AWAY_ENGAGED_TIME) AWAY_ENGAGED_TIME,sum(BACKIN5_ENGAGED_TIME) BACKIN5_ENGAGED_TIME,\
sum(DURATION_DAYS) DURATION_DAYS,sum(ONLINE_TIME) ONLINE_TIME,\
sum(BACK_IN_5_TIME) BACK_IN_5_TIME,sum(AWAY_TIME) AWAY_TIME\
from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR1 }}\
where cast(CHANGE_EVENT_TIME_15M as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING) \
group by 1,2,3,4,5;",
params={"PROJECT_ID":PROJECT_ID_GCP ,
"DATASET_MRR":DATASET_MRR,
"MRR1":MRR_AGENT_ACTIVITY,
"date_a" : v_list[i]
},
destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_ACTIVITY_PARTITIONED}{v_list[i]}",
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
create_partition_table_agent_intensity = BigQueryOperator(
task_id=f"create_partition_table_agent_intensity_{v_list[i]}",
sql=f"select ACCOUNT_ID,timestamp_trunc(AGG_DATE,HOUR) as ANALYSIS_DATE,\
AGENT_ID, GROUP_ID as AGENT_GROUP_ID,\
USER_TYPE_ID, SUM(SUM_CONVERSATION_LOAD_RATE) as SUM_CONVERSATION_LOAD_RATE,\
SUM(NO_EVENTS) AS NO_EVENTS\
from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR2 }}\
where cast(AGG_DATE as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING) \
group by 1,2,3,4,5;",
params={"PROJECT_ID":PROJECT_ID_GCP ,
"DATASET_MRR":DATASET_MRR,
"MRR2":MRR_AGENT_INTENSITY,
"date_a" : v_list[i]
},
destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_INTENSITY_PARTITIONED}{v_list[i]}",
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
d2 = DummyOperator(task_id='generate_data_{0}'.format(v_list[i]),dag=dag)
list_dates >> set_list >> [
create_partition_table_agent_activity,create_partition_table_agent_intensity
] >> d2 >> end_job
我没有 playground 来测试它,但我认为你不应该为 sql 参数使用 f-string。如果您在 f-string 中使用 {{something}}
它 returns string {something}
因此不会插入查询参数,这会导致 SQL 语法错误,因为查询是 运行没有参数。请尝试在第二个任务中删除 sql
字符串前的 f
。