Airflow 2.0 - 调度程序无法在 serialized_dag table 中找到序列化 DAG

Airflow 2.0 - Scheduler is unable to find serialized DAG in the serialized_dag table

我在 dags 目录中有 2 个文件 - dag_1.py 和 dag_2.py

dag_1.py 创建静态 DAG,dag_2.py 在某个位置基于外部 json 文件创建动态 DAG。

静态 DAG(由 dag_1.py 创建)在后期包含一个任务,该任务为 dag_2.py 生成一些输入 json 文件,动态 DAG 就是以这种方式创建的.

这过去适用于未使用 DAG 序列化的 Airflow 1.x 版本。但是在 Airflow 2.0 中,DAG 序列化已经成为强制性的。有时,生成动态 DAG 时,我会在调度程序中遇到以下异常 -

[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
    self._create_dag_runs(query.all(), session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table

在此之后,调度程序将终止,这是预期的。 当我在此错误后手动检查 table 时,我能够在其中看到 DAG 条目。

此问题并非始终可重现。这可能是什么原因?有没有我应该尝试调整的 Airflow 配置?

没有足够的代表发表评论,所以不得不留下答案,但是:

  1. 这是全新的 2.0 安装还是旧 1.10.x 实例的升级?和
  2. 你在回收名字吗?

我真的遇到了这个问题(我发现这个问题是谷歌搜索看看还有谁在同一条船上)。

在我的例子中,它是一个升级的现有 1.10.x 安装,虽然 dags 是动态生成的,但名称是回收的。我在 GUI 中单击 dag 时遇到错误 并且 它正在杀死调度程序。

Turns Out(TM),使用 GUI 概览中的 'trashcan' 按钮完全删除 dags 并让它们重新生成修复它(例如,问题立即消失并且在最后一次没有再次出现30 分钟)。

我猜测,在 db upgrade 步骤中,动态 dag 的某些方面可能没有正确迁移,然后将它们清除并让它们完全重新生成就解决了这个问题。显然,您会丢失所有历史记录等,但是(至少在我的情况下)这不一定是什么大问题。

按以下顺序更新后我们遇到了同样的问题:

  1. 1.10.12 -> 1.10.14
  2. 1.10.14 -> 2.0.0

我已经按照他们的指南进行操作,直到几个小时后调度程序开始崩溃并抱怨在数据库中找不到随机 DAG 后的某个随机点,我们才遇到任何问题。

我们的部署过程涉及清除 /opt/airflow/dags 文件夹并每次执行全新安装(我们将 dag 和支持代码存储在 python 包中)

所以在 1.10.x 版本中,我们偶尔会遇到调度程序解析空文件夹并从数据库中删除序列化 dags 的情况,但它总是能够在下一次解析时恢复图片

显然在 2.0 中,作为使调度器 HA 工作的一部分,他们将 DAG 处理器和调度器完全分离。这会导致竞争条件:

  • 如果调度程序作业在 之前 DAG 处理器更新了 serialized_dag table 值,它发现什么都没有和崩溃
  • 如果运气好的话,上面的情况就不会发生,你也不会看到这个异常

为了解决这个问题,我通过在数据库中更新 is_paused 来禁用所有 DAG 的调度,重新启动调度程序,一旦它生成序列化的 dag,就将所有 dag 重新打开

我在 https://github.com/apache/airflow/pull/13893 中解决了这个问题,它将作为 Airflow 2.0.1 的一部分发布。

将在下周(2021 年 2 月 8 日 - 最有可能)发布 Airflow 2.0.1。

选择的答案对我不起作用(在我的头上敲了几个小时之后)。 这是有效的:

只要到后端数据库(postgresql)删除所有关于logs, task instances, faild task and ...的记录 但不要删除主表(如果你不能分辨差异,只需删除我提到的表)

然后去做airflow db init

似乎有关过时和已删除的 dag 和任务的旧数据确实会使气流变得一团糟。删除混乱,让它工作。