使用 BigQueryInsertJobOperator 从 UI 手动触发 DAG 时出现错误 "Invalid job id"
Error "Invalid job id" when DAG is triggered manually from UI using BigQueryInsertJobOperator
我正在尝试使用“BigQueryInsertJobOperator”从作曲家触发存储过程。 Dag 在按计划触发时运行良好,但在从 Airflow UI 手动触发时失败并出现以下错误。
无效的 jobIDairflow_DQ_create_stored_procedure_2021_11_02T01_36_02.229065_00_00_XXXXXXXXXXXXXXXXXX”。作业 ID 必须是字母数字(加上下划线和破折号),并且长度最多为 1024 个字符。
创建的作业 ID 中只有字母数字和“_”。它的长度不是 1024 个字符。
手动触发和计划触发都应该是可能的。请帮忙!
编辑-1:
如果我们从最后配置作业 ID,它就可以工作。但是,如果 BQ 自动生成作业 ID,它会抛出错误。
下面是代码片段:
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
'start_date': yesterday
}
with models.DAG(
'DQ_create',
schedule_interval ='@daily',
default_args = default_dag_args
) as dag:
Stored_Procedure = BigQueryInsertJobOperator(
task_id='stored_procedure',
configuration={
"query": {
"query": "CALL `project.dataset.procedure`() ",
"useLegacySql": False,
}
},
)
Stored_Procedure```
您在 BigQueryInsertJobOperator class 中发现了一个错误。 class 负责构造 job_id,它通过获取 DAG 变量的值并将它们连接起来来实现:
exec_date = context['execution_date'].isoformat()
job_id = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"
您使用的“execution_date”似乎具有毫秒级精度,这使得 exec_date
变量中有一个句点。
你有四个选项来解决这个问题:
- 创建 BigQueryInsertJobOperator 的子class,t运行在调用
super().execute(context)
之前将其 execution_date 归类到第二个。生成的 job_id 字符串将没有句点。
- 创建 BigQueryInsertJobOperator 的子class,它在调用
super().execute(context)
之前动态生成一个唯一的 job_id
- 将你的 DAG 安排到 运行 分钟开始(或至少是秒开始!),从而迫使你的 execution_date 没有毫秒精度
- 升级到 Airflow v2.0。他们通过删除句点修复了这个错误。(https://github.com/apache/airflow/commit/47b05a87f004dc273a4757ba49f03808a86f77e7#diff-529929b4ca60ce73b8da0f45d8a5c43c2d4e391b913fe78b39892899f812951e)
我正在尝试使用“BigQueryInsertJobOperator”从作曲家触发存储过程。 Dag 在按计划触发时运行良好,但在从 Airflow UI 手动触发时失败并出现以下错误。 无效的 jobIDairflow_DQ_create_stored_procedure_2021_11_02T01_36_02.229065_00_00_XXXXXXXXXXXXXXXXXX”。作业 ID 必须是字母数字(加上下划线和破折号),并且长度最多为 1024 个字符。 创建的作业 ID 中只有字母数字和“_”。它的长度不是 1024 个字符。 手动触发和计划触发都应该是可能的。请帮忙!
编辑-1: 如果我们从最后配置作业 ID,它就可以工作。但是,如果 BQ 自动生成作业 ID,它会抛出错误。
下面是代码片段:
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
'start_date': yesterday
}
with models.DAG(
'DQ_create',
schedule_interval ='@daily',
default_args = default_dag_args
) as dag:
Stored_Procedure = BigQueryInsertJobOperator(
task_id='stored_procedure',
configuration={
"query": {
"query": "CALL `project.dataset.procedure`() ",
"useLegacySql": False,
}
},
)
Stored_Procedure```
您在 BigQueryInsertJobOperator class 中发现了一个错误。 class 负责构造 job_id,它通过获取 DAG 变量的值并将它们连接起来来实现:
exec_date = context['execution_date'].isoformat()
job_id = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"
您使用的“execution_date”似乎具有毫秒级精度,这使得 exec_date
变量中有一个句点。
你有四个选项来解决这个问题:
- 创建 BigQueryInsertJobOperator 的子class,t运行在调用
super().execute(context)
之前将其 execution_date 归类到第二个。生成的 job_id 字符串将没有句点。 - 创建 BigQueryInsertJobOperator 的子class,它在调用
super().execute(context)
之前动态生成一个唯一的 job_id
- 将你的 DAG 安排到 运行 分钟开始(或至少是秒开始!),从而迫使你的 execution_date 没有毫秒精度
- 升级到 Airflow v2.0。他们通过删除句点修复了这个错误。(https://github.com/apache/airflow/commit/47b05a87f004dc273a4757ba49f03808a86f77e7#diff-529929b4ca60ce73b8da0f45d8a5c43c2d4e391b913fe78b39892899f812951e)