使用 BigQueryInsertJobOperator 从 UI 手动触发 DAG 时出现错误 "Invalid job id"

Error "Invalid job id" when DAG is triggered manually from UI using BigQueryInsertJobOperator

我正在尝试使用“BigQueryInsertJobOperator”从作曲家触发存储过程。 Dag 在按计划触发时运行良好,但在从 Airflow UI 手动触发时失败并出现以下错误。 无效的 jobIDairflow_DQ_create_stored_procedure_2021_11_02T01_36_02.229065_00_00_XXXXXXXXXXXXXXXXXX”。作业 ID 必须是字母数字(加上下划线和破折号),并且长度最多为 1024 个字符。 创建的作业 ID 中只有字母数字和“_”。它的长度不是 1024 个字符。 手动触发和计划触发都应该是可能的。请帮忙!

编辑-1: 如果我们从最后配置作业 ID,它就可以工作。但是,如果 BQ 自动生成作业 ID,它会抛出错误。

下面是代码片段:

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())


default_dag_args = {
    'start_date': yesterday
}



with models.DAG(
'DQ_create',
schedule_interval ='@daily',
default_args = default_dag_args
) as dag:        

    Stored_Procedure = BigQueryInsertJobOperator(
        task_id='stored_procedure',
        configuration={
        "query": {
                "query": "CALL `project.dataset.procedure`() ",
                "useLegacySql": False,
            }
},
)

Stored_Procedure```

您在 BigQueryInsertJobOperator class 中发现了一个错误。 class 负责构造 job_id,它通过获取 DAG 变量的值并将它们连接起来来实现:

  exec_date = context['execution_date'].isoformat()
  job_id = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"

您使用的“execution_date”似乎具有毫秒级精度,这使得 exec_date 变量中有一个句点。

你有四个选项来解决这个问题:

  1. 创建 BigQueryInsertJobOperator 的子class,t运行在调用 super().execute(context) 之前将其 execution_date 归类到第二个。生成的 job_id 字符串将没有句点。
  2. 创建 BigQueryInsertJobOperator 的子class,它在调用 super().execute(context)
  3. 之前动态生成一个唯一的 job_id
  4. 将你的 DAG 安排到 运行 分钟开始(或至少是秒开始!),从而迫使你的 execution_date 没有毫秒精度
  5. 升级到 Airflow v2.0。他们通过删除句点修复了这个错误。(https://github.com/apache/airflow/commit/47b05a87f004dc273a4757ba49f03808a86f77e7#diff-529929b4ca60ce73b8da0f45d8a5c43c2d4e391b913fe78b39892899f812951e)