Airflow 2.0 - 调度程序无法在 serialized_dag table 中找到序列化 DAG
Airflow 2.0 - Scheduler is unable to find serialized DAG in the serialized_dag table
我在 dags 目录中有 2 个文件 - dag_1.py 和 dag_2.py
dag_1.py 创建静态 DAG,dag_2.py 在某个位置基于外部 json 文件创建动态 DAG。
静态 DAG(由 dag_1.py 创建)在后期包含一个任务,该任务为 dag_2.py 生成一些输入 json 文件,动态 DAG 就是以这种方式创建的.
这过去适用于未使用 DAG 序列化的 Airflow 1.x 版本。但是在 Airflow 2.0 中,DAG 序列化已经成为强制性的。有时,生成动态 DAG 时,我会在调度程序中遇到以下异常 -
[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
self._run_scheduler_loop()
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
self._create_dag_runs(query.all(), session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table
在此之后,调度程序将终止,这是预期的。
当我在此错误后手动检查 table 时,我能够在其中看到 DAG 条目。
此问题并非始终可重现。这可能是什么原因?有没有我应该尝试调整的 Airflow 配置?
没有足够的代表发表评论,所以不得不留下答案,但是:
- 这是全新的 2.0 安装还是旧 1.10.x 实例的升级?和
- 你在回收名字吗?
我真的遇到了这个问题(我发现这个问题是谷歌搜索看看还有谁在同一条船上)。
在我的例子中,它是一个升级的现有 1.10.x 安装,虽然 dags 是动态生成的,但名称是回收的。我在 GUI 中单击 dag 时遇到错误 并且 它正在杀死调度程序。
Turns Out(TM),使用 GUI 概览中的 'trashcan' 按钮完全删除 dags 并让它们重新生成修复它(例如,问题立即消失并且在最后一次没有再次出现30 分钟)。
我猜测,在 db upgrade
步骤中,动态 dag 的某些方面可能没有正确迁移,然后将它们清除并让它们完全重新生成就解决了这个问题。显然,您会丢失所有历史记录等,但是(至少在我的情况下)这不一定是什么大问题。
按以下顺序更新后我们遇到了同样的问题:
- 1.10.12 -> 1.10.14
- 1.10.14 -> 2.0.0
我已经按照他们的指南进行操作,直到几个小时后调度程序开始崩溃并抱怨在数据库中找不到随机 DAG 后的某个随机点,我们才遇到任何问题。
我们的部署过程涉及清除 /opt/airflow/dags
文件夹并每次执行全新安装(我们将 dag 和支持代码存储在 python 包中)
所以在 1.10.x 版本中,我们偶尔会遇到调度程序解析空文件夹并从数据库中删除序列化 dags 的情况,但它总是能够在下一次解析时恢复图片
显然在 2.0 中,作为使调度器 HA 工作的一部分,他们将 DAG 处理器和调度器完全分离。这会导致竞争条件:
- 如果调度程序作业在 之前 DAG 处理器更新了 serialized_dag table 值,它发现什么都没有和崩溃
- 如果运气好的话,上面的情况就不会发生,你也不会看到这个异常
为了解决这个问题,我通过在数据库中更新 is_paused
来禁用所有 DAG 的调度,重新启动调度程序,一旦它生成序列化的 dag,就将所有 dag 重新打开
我在 https://github.com/apache/airflow/pull/13893 中解决了这个问题,它将作为 Airflow 2.0.1 的一部分发布。
将在下周(2021 年 2 月 8 日 - 最有可能)发布 Airflow 2.0.1。
选择的答案对我不起作用(在我的头上敲了几个小时之后)。
这是有效的:
只要到后端数据库(postgresql)删除所有关于logs, task instances, faild task and ...
的记录 但不要删除主表(如果你不能分辨差异,只需删除我提到的表)
然后去做airflow db init
似乎有关过时和已删除的 dag 和任务的旧数据确实会使气流变得一团糟。删除混乱,让它工作。
我在 dags 目录中有 2 个文件 - dag_1.py 和 dag_2.py
dag_1.py 创建静态 DAG,dag_2.py 在某个位置基于外部 json 文件创建动态 DAG。
静态 DAG(由 dag_1.py 创建)在后期包含一个任务,该任务为 dag_2.py 生成一些输入 json 文件,动态 DAG 就是以这种方式创建的.
这过去适用于未使用 DAG 序列化的 Airflow 1.x 版本。但是在 Airflow 2.0 中,DAG 序列化已经成为强制性的。有时,生成动态 DAG 时,我会在调度程序中遇到以下异常 -
[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
self._run_scheduler_loop()
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
self._create_dag_runs(query.all(), session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table
在此之后,调度程序将终止,这是预期的。 当我在此错误后手动检查 table 时,我能够在其中看到 DAG 条目。
此问题并非始终可重现。这可能是什么原因?有没有我应该尝试调整的 Airflow 配置?
没有足够的代表发表评论,所以不得不留下答案,但是:
- 这是全新的 2.0 安装还是旧 1.10.x 实例的升级?和
- 你在回收名字吗?
我真的遇到了这个问题(我发现这个问题是谷歌搜索看看还有谁在同一条船上)。
在我的例子中,它是一个升级的现有 1.10.x 安装,虽然 dags 是动态生成的,但名称是回收的。我在 GUI 中单击 dag 时遇到错误 并且 它正在杀死调度程序。
Turns Out(TM),使用 GUI 概览中的 'trashcan' 按钮完全删除 dags 并让它们重新生成修复它(例如,问题立即消失并且在最后一次没有再次出现30 分钟)。
我猜测,在 db upgrade
步骤中,动态 dag 的某些方面可能没有正确迁移,然后将它们清除并让它们完全重新生成就解决了这个问题。显然,您会丢失所有历史记录等,但是(至少在我的情况下)这不一定是什么大问题。
按以下顺序更新后我们遇到了同样的问题:
- 1.10.12 -> 1.10.14
- 1.10.14 -> 2.0.0
我已经按照他们的指南进行操作,直到几个小时后调度程序开始崩溃并抱怨在数据库中找不到随机 DAG 后的某个随机点,我们才遇到任何问题。
我们的部署过程涉及清除 /opt/airflow/dags
文件夹并每次执行全新安装(我们将 dag 和支持代码存储在 python 包中)
所以在 1.10.x 版本中,我们偶尔会遇到调度程序解析空文件夹并从数据库中删除序列化 dags 的情况,但它总是能够在下一次解析时恢复图片
显然在 2.0 中,作为使调度器 HA 工作的一部分,他们将 DAG 处理器和调度器完全分离。这会导致竞争条件:
- 如果调度程序作业在 之前 DAG 处理器更新了 serialized_dag table 值,它发现什么都没有和崩溃
- 如果运气好的话,上面的情况就不会发生,你也不会看到这个异常
为了解决这个问题,我通过在数据库中更新 is_paused
来禁用所有 DAG 的调度,重新启动调度程序,一旦它生成序列化的 dag,就将所有 dag 重新打开
我在 https://github.com/apache/airflow/pull/13893 中解决了这个问题,它将作为 Airflow 2.0.1 的一部分发布。
将在下周(2021 年 2 月 8 日 - 最有可能)发布 Airflow 2.0.1。
选择的答案对我不起作用(在我的头上敲了几个小时之后)。 这是有效的:
只要到后端数据库(postgresql)删除所有关于logs, task instances, faild task and ...
的记录 但不要删除主表(如果你不能分辨差异,只需删除我提到的表)
然后去做airflow db init
似乎有关过时和已删除的 dag 和任务的旧数据确实会使气流变得一团糟。删除混乱,让它工作。