Airflow scheduler crashing: AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'
Airflow scheduler crashing: AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'
这个错误是什么意思?我有一个简单的气流设置,其中我 运行 本地 kind/kubernetes 集群上的 airflow helm chart 和 CeleryKubernetesExecutor
│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-13 manual__2022 │
│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-17 manual__2022 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1347} WARNING - Failing (6) jobs without heartbeat after 2022-05-25 06:50:40.19719 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1355} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/repo/dags/ │
│ scheduler [2022-05-25 06:55:40,208] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop │
│ scheduler Traceback (most recent call last): │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute │
│ scheduler self._run_scheduler_loop() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop │
│ scheduler next_event = timers.run(blocking=False) │
│ scheduler File "/usr/local/lib/python3.7/sched.py", line 151, in run │
│ scheduler action(*argument, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat │
│ scheduler action(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper │
│ scheduler return func(*args, session=session, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies │
│ scheduler self.executor.send_callback(request) │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback' │
│ scheduler [2022-05-25 06:55:40,218] {kubernetes_executor.py:807} INFO - Shutting down Kubernetes executor │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 64. PIDs of all processes in the grou │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 64 │
│ scheduler [2022-05-25 06:55:41,409] {process_utils.py:75} INFO - Process psutil.Process(pid=64, status='terminated', exitcode=0, started │
│ scheduler [2022-05-25 06:55:41,410] {scheduler_job.py:765} INFO - Exited execute loop │
│ scheduler Traceback (most recent call last): │
│ scheduler File "/home/airflow/.local/bin/airflow", line 8, in <module> │
│ scheduler sys.exit(main()) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main │
│ scheduler args.func(args) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command │
│ scheduler return func(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper │
│ scheduler return f(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler │
│ scheduler _run_scheduler_job(args=args) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_schedule │
│ scheduler job.run() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run │
│ scheduler self._execute() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute │
│ scheduler self._run_scheduler_loop() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop │
│ scheduler next_event = timers.run(blocking=False) │
│ scheduler File "/usr/local/lib/python3.7/sched.py", line 151, in run │
│ scheduler action(*argument, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat │
│ scheduler action(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper │
│ scheduler return func(*args, session=session, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies │
│ scheduler self.executor.send_callback(request) │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback' │
是不是因为我更新了我的 dag 定义而出错,现在有些任务 zombie/invalid?我怀疑这是导致崩溃的原因是“AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'”,但这是怎么回事?
这个错误是什么意思?我有一个简单的气流设置,其中我 运行 本地 kind/kubernetes 集群上的 airflow helm chart 和 CeleryKubernetesExecutor
│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-13 manual__2022 │
│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-17 manual__2022 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1347} WARNING - Failing (6) jobs without heartbeat after 2022-05-25 06:50:40.19719 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1355} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/repo/dags/ │
│ scheduler [2022-05-25 06:55:40,208] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop │
│ scheduler Traceback (most recent call last): │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute │
│ scheduler self._run_scheduler_loop() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop │
│ scheduler next_event = timers.run(blocking=False) │
│ scheduler File "/usr/local/lib/python3.7/sched.py", line 151, in run │
│ scheduler action(*argument, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat │
│ scheduler action(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper │
│ scheduler return func(*args, session=session, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies │
│ scheduler self.executor.send_callback(request) │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback' │
│ scheduler [2022-05-25 06:55:40,218] {kubernetes_executor.py:807} INFO - Shutting down Kubernetes executor │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 64. PIDs of all processes in the grou │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 64 │
│ scheduler [2022-05-25 06:55:41,409] {process_utils.py:75} INFO - Process psutil.Process(pid=64, status='terminated', exitcode=0, started │
│ scheduler [2022-05-25 06:55:41,410] {scheduler_job.py:765} INFO - Exited execute loop │
│ scheduler Traceback (most recent call last): │
│ scheduler File "/home/airflow/.local/bin/airflow", line 8, in <module> │
│ scheduler sys.exit(main()) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main │
│ scheduler args.func(args) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command │
│ scheduler return func(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper │
│ scheduler return f(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler │
│ scheduler _run_scheduler_job(args=args) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_schedule │
│ scheduler job.run() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run │
│ scheduler self._execute() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute │
│ scheduler self._run_scheduler_loop() │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop │
│ scheduler next_event = timers.run(blocking=False) │
│ scheduler File "/usr/local/lib/python3.7/sched.py", line 151, in run │
│ scheduler action(*argument, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat │
│ scheduler action(*args, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper │
│ scheduler return func(*args, session=session, **kwargs) │
│ scheduler File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies │
│ scheduler self.executor.send_callback(request) │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback' │
是不是因为我更新了我的 dag 定义而出错,现在有些任务 zombie/invalid?我怀疑这是导致崩溃的原因是“AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'”,但这是怎么回事?