在 'dag_id' 中键入 'name of dag' 的重复条目

Duplicate entry for 'name of dag' for key in 'dag_id'

我正在 运行使用 TriggerDagRunOperator 使用不同的负载触发另一个 dag 两次。

第一个开始运行,但是第二个总是失败说:

sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'external_dag-2021-11-01 00:00:00.000000' for key 'dag_id'"

两个 TriggerDagRunOperator 都有不同的 execution_dates 和“reset_dag_run”设置为 true:

x = TriggerDagRunOperator(
    task_id="x_external_dag",
    trigger_dag_id="external_dag",
    python_callable= pass_args_for_x,
    execution_date="{{ execution_date }}",
    reset_dag_run = True
)
y = TriggerDagRunOperator(
    task_id="y_external_dag",
    trigger_dag_id="external_dag",
    python_callable=pass_args_for_y,
    execution_date="{{ ds }}",
    reset_dag_run = True
)

我 运行 不知道如何解决这个问题。任何帮助将非常感激。 问候!

我认为这个错误与两个 DagRuns 被同一个 execution_date 触发有关。 "{{ ds }}""{{ execution_date }}"是同一个值,不同类型的不同表示,strDateTime。还要注意 {{ execution_date }} 变量很快就会变成 deprecated.

要解决错误,请尝试在任务定义期间不定义 execution_date。 Docstrings 没有指定它,但默认值实际上是 None。看一下TriggerDagRunOperatorexecute() method,会发现when没有定义,被触发的DAG的execution_date设置为timezone.utcnow().

按照这条路径,我认为您也不需要设置 reset_dag_run = True。我自己没有测试过,但应该是:

x = TriggerDagRunOperator(
    task_id="x_external_dag",
    trigger_dag_id="external_dag",
    python_callable= pass_args_for_x

)
y = TriggerDagRunOperator(
    task_id="y_external_dag",
    trigger_dag_id="external_dag",
    python_callable=pass_args_for_y
)

让我知道它是否适合你。