Google 数据流 API 调用失败-
Google dataflow API call failure-
我正在尝试使用数据流运算符从 composer airflow 调用数据流作业,但在调用它时出现以下错误:
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/project/locations/europe-west2/templates:launch?gcsPath=gs%3A%2F%2Fvdm_dev_dataflow_daily_upload%2Fdataflow_daily_upload%2Fbigquery_to_bigtable&alt=json returned "Invalid value at 'launch_parameters.parameters' (type.googleapis.com/google.dataflow.v1beta3.LaunchTemplateParameters.ParametersEntry), "{'allowed_features': 'SELECT distinct data\nFROM
project.dataset.table1\nWHERE ace_date="2022-05-12"', 'event_to_programme_mapping': 'SELECT distinct data\nFROM
project.dataset.table2\nWHERE ace_date="2022-05-12"', 'priority_data': 'SELECT distinct data\nFROM
project.dataset.table3\nWHERE ace_date="2022-05-12"', 'programme_to_event_mapping': 'SELECT distinct data\nFROM
[=29= .table4\nWHERE ace_date="2022-05-12"', 'text_rules': 'SELECT distinct data\nFROM
project.dataset.table5\nWHERE ace_date="2022-05-12"', 'table_rules': 'SELECT distinct data\nFROM
project.dataset.tabl6\nWHERE ace_date="2022-05-12"', 'pack_rules': 'SELECT distinct data\nFROM
project.dataset.table7\nWHERE ace_date="2022-05-12"', 'pricing': 'SELECT distinct row_key_data as data\nFROM
peoject.dataset.表 7\nWHERE date_of_run="2022-05-16"'}""
下面是从 Airflow 调用它时的相同代码:
def dataflow_trigger(
task,
):
"""
Dynamic task for calling dataflow job
"""
return DataflowTemplatedJobStartOperator(
task_id=task,
project_id="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['project']}}",
job_name="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['job_name']}}",
template="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['template_path']}}",
parameters="{{task_instance.xcom_pull(key='parameters', task_ids='get_settings')}}",
location='europe-west2',
)
这有助于解决问题,因为 xcom push 将其存储为字符串;在 DAG 构造函数中使用 render_template_as_native_obj=True 解决了这个问题。
我正在尝试使用数据流运算符从 composer airflow 调用数据流作业,但在调用它时出现以下错误:
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/project/locations/europe-west2/templates:launch?gcsPath=gs%3A%2F%2Fvdm_dev_dataflow_daily_upload%2Fdataflow_daily_upload%2Fbigquery_to_bigtable&alt=json returned "Invalid value at 'launch_parameters.parameters' (type.googleapis.com/google.dataflow.v1beta3.LaunchTemplateParameters.ParametersEntry), "{'allowed_features': 'SELECT distinct data\nFROM
project.dataset.table1\nWHERE ace_date="2022-05-12"', 'event_to_programme_mapping': 'SELECT distinct data\nFROM
project.dataset.table2\nWHERE ace_date="2022-05-12"', 'priority_data': 'SELECT distinct data\nFROM
project.dataset.table3\nWHERE ace_date="2022-05-12"', 'programme_to_event_mapping': 'SELECT distinct data\nFROM
[=29= .table4\nWHERE ace_date="2022-05-12"', 'text_rules': 'SELECT distinct data\nFROM
project.dataset.table5\nWHERE ace_date="2022-05-12"', 'table_rules': 'SELECT distinct data\nFROM
project.dataset.tabl6\nWHERE ace_date="2022-05-12"', 'pack_rules': 'SELECT distinct data\nFROM
project.dataset.table7\nWHERE ace_date="2022-05-12"', 'pricing': 'SELECT distinct row_key_data as data\nFROM
peoject.dataset.表 7\nWHERE date_of_run="2022-05-16"'}""
下面是从 Airflow 调用它时的相同代码:
def dataflow_trigger(
task,
):
"""
Dynamic task for calling dataflow job
"""
return DataflowTemplatedJobStartOperator(
task_id=task,
project_id="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['project']}}",
job_name="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['job_name']}}",
template="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['template_path']}}",
parameters="{{task_instance.xcom_pull(key='parameters', task_ids='get_settings')}}",
location='europe-west2',
)