气流调度器故障
Airflow scheduler failure
我已关注
this 尝试使用我自己的 DAG 在本地主机上构建气流集群的教程。当我 运行 airflow scheduler
在配置文件中设置 executor = CeleryExecutor
后,我收到以下回溯:
Traceback (most recent call last):
File "/home/yurii/Tools/anaconda3/bin/airflow", line 28, in
args.func(args)
File"/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 839, in scheduler job.run()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1309, in _execute
self._execute_helper(processor_manager)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1441, in _execute_helper
self.executor.heartbeat()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 124, in heartbeat
self.execute_async(key, command=command, queue=queue)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 80, in execute_async
args=[command], queue=queue)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 573, in apply_async
**dict(self._get_exec_options(), **options)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 354, in send_task
reply_to=reply_to or self.oid, **options
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py", line 310, in publish_task
**kwargs
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 172, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 449, in _ensured
return fun(*args, **kwargs)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 188, in _publish
mandatory=mandatory, immediate=immediate,
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py", line 122, in basic_publish
mandatory or False, immediate or False,
TypeError: an integer is required (got type NoneType)
一些附加信息:
- 我正在使用 Airflow 1.8.0 以及 Celery 3.1.25 和 RabbitMQ 3.5.7 作为代理和后端,但也尝试了 Airflow 1.9.0 和 Celery 4.2。
- 气流与顺序执行器一起工作没有任何问题。
- `气流测试 "dag_name" "task_name" "exec_date" 运行成功。
我是 Airflow/Celery/RabbitMQ/SQL 的新手,如有任何帮助,我们将不胜感激!
您似乎正在使用 librabbitmq 作为 amqp 代理,这不是 celery 核心团队推荐的。使用 py-amqp 作为 rabbitmq 代理,你应该摆脱这个错误。
添加到以前的答案。使用 py-amqp 涉及从 broker_url = amqp://XXXXX
更改为 broker_url = pyamqp://XXXXX
或者
pip uninstall librabbitmq
.
此外,您可能需要将 airflow.cfg
中的 celery_result_backend
变量更改为 result_backend
。在最近的版本中,airflow.cfg
中 [celery]
节点中的变量的 celery_
前缀已被删除。
我已关注
this 尝试使用我自己的 DAG 在本地主机上构建气流集群的教程。当我 运行 airflow scheduler
在配置文件中设置 executor = CeleryExecutor
后,我收到以下回溯:
Traceback (most recent call last):
File "/home/yurii/Tools/anaconda3/bin/airflow", line 28, in args.func(args)
File"/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 839, in scheduler job.run()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 200, in run self._execute()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1309, in _execute self._execute_helper(processor_manager)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1441, in _execute_helper self.executor.heartbeat()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 124, in heartbeat self.execute_async(key, command=command, queue=queue)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 80, in execute_async args=[command], queue=queue)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 573, in apply_async **dict(self._get_exec_options(), **options)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 354, in send_task reply_to=reply_to or self.oid, **options
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py", line 310, in publish_task **kwargs
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 172, in publish routing_key, mandatory, immediate, exchange, declare)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 449, in _ensured return fun(*args, **kwargs)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 188, in _publish mandatory=mandatory, immediate=immediate,
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py", line 122, in basic_publish mandatory or False, immediate or False,
TypeError: an integer is required (got type NoneType)
一些附加信息:
- 我正在使用 Airflow 1.8.0 以及 Celery 3.1.25 和 RabbitMQ 3.5.7 作为代理和后端,但也尝试了 Airflow 1.9.0 和 Celery 4.2。
- 气流与顺序执行器一起工作没有任何问题。
- `气流测试 "dag_name" "task_name" "exec_date" 运行成功。
我是 Airflow/Celery/RabbitMQ/SQL 的新手,如有任何帮助,我们将不胜感激!
您似乎正在使用 librabbitmq 作为 amqp 代理,这不是 celery 核心团队推荐的。使用 py-amqp 作为 rabbitmq 代理,你应该摆脱这个错误。
添加到以前的答案。使用 py-amqp 涉及从 broker_url = amqp://XXXXX
更改为 broker_url = pyamqp://XXXXX
或者
pip uninstall librabbitmq
.
此外,您可能需要将 airflow.cfg
中的 celery_result_backend
变量更改为 result_backend
。在最近的版本中,airflow.cfg
中 [celery]
节点中的变量的 celery_
前缀已被删除。