在 'dag_id' 中键入 'name of dag' 的重复条目
Duplicate entry for 'name of dag' for key in 'dag_id'
我正在 运行使用 TriggerDagRunOperator 使用不同的负载触发另一个 dag 两次。
第一个开始运行,但是第二个总是失败说:
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'external_dag-2021-11-01 00:00:00.000000' for key 'dag_id'"
两个 TriggerDagRunOperator 都有不同的 execution_dates 和“reset_dag_run”设置为 true:
x = TriggerDagRunOperator(
task_id="x_external_dag",
trigger_dag_id="external_dag",
python_callable= pass_args_for_x,
execution_date="{{ execution_date }}",
reset_dag_run = True
)
y = TriggerDagRunOperator(
task_id="y_external_dag",
trigger_dag_id="external_dag",
python_callable=pass_args_for_y,
execution_date="{{ ds }}",
reset_dag_run = True
)
我 运行 不知道如何解决这个问题。任何帮助将非常感激。
问候!
我认为这个错误与两个 DagRuns 被同一个 execution_date
触发有关。 "{{ ds }}"
和"{{ execution_date }}"
是同一个值,不同类型的不同表示,str
和DateTime
。还要注意 {{ execution_date }}
变量很快就会变成 deprecated.
要解决错误,请尝试在任务定义期间不定义 execution_date
。 Docstrings 没有指定它,但默认值实际上是 None
。看一下TriggerDagRunOperator的execute() method,会发现when没有定义,被触发的DAG的execution_date
设置为timezone.utcnow()
.
按照这条路径,我认为您也不需要设置 reset_dag_run = True
。我自己没有测试过,但应该是:
x = TriggerDagRunOperator(
task_id="x_external_dag",
trigger_dag_id="external_dag",
python_callable= pass_args_for_x
)
y = TriggerDagRunOperator(
task_id="y_external_dag",
trigger_dag_id="external_dag",
python_callable=pass_args_for_y
)
让我知道它是否适合你。
我正在 运行使用 TriggerDagRunOperator 使用不同的负载触发另一个 dag 两次。
第一个开始运行,但是第二个总是失败说:
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'external_dag-2021-11-01 00:00:00.000000' for key 'dag_id'"
两个 TriggerDagRunOperator 都有不同的 execution_dates 和“reset_dag_run”设置为 true:
x = TriggerDagRunOperator(
task_id="x_external_dag",
trigger_dag_id="external_dag",
python_callable= pass_args_for_x,
execution_date="{{ execution_date }}",
reset_dag_run = True
)
y = TriggerDagRunOperator(
task_id="y_external_dag",
trigger_dag_id="external_dag",
python_callable=pass_args_for_y,
execution_date="{{ ds }}",
reset_dag_run = True
)
我 运行 不知道如何解决这个问题。任何帮助将非常感激。 问候!
我认为这个错误与两个 DagRuns 被同一个 execution_date
触发有关。 "{{ ds }}"
和"{{ execution_date }}"
是同一个值,不同类型的不同表示,str
和DateTime
。还要注意 {{ execution_date }}
变量很快就会变成 deprecated.
要解决错误,请尝试在任务定义期间不定义 execution_date
。 Docstrings 没有指定它,但默认值实际上是 None
。看一下TriggerDagRunOperator的execute() method,会发现when没有定义,被触发的DAG的execution_date
设置为timezone.utcnow()
.
按照这条路径,我认为您也不需要设置 reset_dag_run = True
。我自己没有测试过,但应该是:
x = TriggerDagRunOperator(
task_id="x_external_dag",
trigger_dag_id="external_dag",
python_callable= pass_args_for_x
)
y = TriggerDagRunOperator(
task_id="y_external_dag",
trigger_dag_id="external_dag",
python_callable=pass_args_for_y
)
让我知道它是否适合你。