Airflow scheduler crashing: AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'

Airflow scheduler crashing: AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'

这个错误是什么意思?我有一个简单的气流设置,其中我 运行 本地 kind/kubernetes 集群上的 airflow helm chartCeleryKubernetesExecutor

│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-13 manual__2022 │
│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-17 manual__2022 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1347} WARNING - Failing (6) jobs without heartbeat after 2022-05-25 06:50:40.19719 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1355} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/repo/dags/ │
│ scheduler [2022-05-25 06:55:40,208] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop             │
│ scheduler Traceback (most recent call last):                                                                                             │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute                 │
│ scheduler     self._run_scheduler_loop()                                                                                                 │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop      │
│ scheduler     next_event = timers.run(blocking=False)                                                                                    │
│ scheduler   File "/usr/local/lib/python3.7/sched.py", line 151, in run                                                                   │
│ scheduler     action(*argument, **kwargs)                                                                                                │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat                 │
│ scheduler     action(*args, **kwargs)                                                                                                    │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper                        │
│ scheduler     return func(*args, session=session, **kwargs)                                                                              │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies           │
│ scheduler     self.executor.send_callback(request)                                                                                       │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'                                             │
│ scheduler [2022-05-25 06:55:40,218] {kubernetes_executor.py:807} INFO - Shutting down Kubernetes executor                                │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 64. PIDs of all processes in the grou │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 64                          │
│ scheduler [2022-05-25 06:55:41,409] {process_utils.py:75} INFO - Process psutil.Process(pid=64, status='terminated', exitcode=0, started │
│ scheduler [2022-05-25 06:55:41,410] {scheduler_job.py:765} INFO - Exited execute loop                                                    │
│ scheduler Traceback (most recent call last):                                                                                             │
│ scheduler   File "/home/airflow/.local/bin/airflow", line 8, in <module>                                                                 │
│ scheduler     sys.exit(main())                                                                                                           │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main                                │
│ scheduler     args.func(args)                                                                                                            │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command                       │
│ scheduler     return func(*args, **kwargs)                                                                                               │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper                            │
│ scheduler     return f(*args, **kwargs)                                                                                                  │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler     │
│ scheduler     _run_scheduler_job(args=args)                                                                                              │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_schedule │
│ scheduler     job.run()                                                                                                                  │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run                           │
│ scheduler     self._execute()                                                                                                            │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute                 │
│ scheduler     self._run_scheduler_loop()                                                                                                 │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop      │
│ scheduler     next_event = timers.run(blocking=False)                                                                                    │
│ scheduler   File "/usr/local/lib/python3.7/sched.py", line 151, in run                                                                   │
│ scheduler     action(*argument, **kwargs)                                                                                                │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat                 │
│ scheduler     action(*args, **kwargs)                                                                                                    │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper                        │
│ scheduler     return func(*args, session=session, **kwargs)                                                                              │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies           │
│ scheduler     self.executor.send_callback(request)                                                                                       │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'                                             │

是不是因为我更新了我的 dag 定义而出错,现在有些任务 zombie/invalid?我怀疑这是导致崩溃的原因是“AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'”,但这是怎么回事?

似乎是 airflow issue 版本 2.3.0

固定在2.3.1