气流调度器故障

Airflow scheduler failure

我已关注 this 尝试使用我自己的 DAG 在本地主机上构建气流集群的教程。当我 运行 airflow scheduler 在配置文件中设置 executor = CeleryExecutor 后,我收到以下回溯:

Traceback (most recent call last):

File "/home/yurii/Tools/anaconda3/bin/airflow", line 28, in args.func(args)

File"/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 839, in scheduler job.run()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 200, in run self._execute()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1309, in _execute self._execute_helper(processor_manager)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1441, in _execute_helper self.executor.heartbeat()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 124, in heartbeat self.execute_async(key, command=command, queue=queue)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 80, in execute_async args=[command], queue=queue)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 573, in apply_async **dict(self._get_exec_options(), **options)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 354, in send_task reply_to=reply_to or self.oid, **options

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py", line 310, in publish_task **kwargs

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 172, in publish routing_key, mandatory, immediate, exchange, declare)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 449, in _ensured return fun(*args, **kwargs)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 188, in _publish mandatory=mandatory, immediate=immediate,

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py", line 122, in basic_publish mandatory or False, immediate or False,

TypeError: an integer is required (got type NoneType)

一些附加信息:

我是 Airflow/Celery/RabbitMQ/SQL 的新手,如有任何帮助,我们将不胜感激!

您似乎正在使用 librabbitmq 作为 amqp 代理,这不是 celery 核心团队推荐的。使用 py-amqp 作为 rabbitmq 代理,你应该摆脱这个错误。

添加到以前的答案。使用 py-amqp 涉及从 broker_url = amqp://XXXXX 更改为 broker_url = pyamqp://XXXXX 或者 pip uninstall librabbitmq.

此外,您可能需要将 airflow.cfg 中的 celery_result_backend 变量更改为 result_backend。在最近的版本中,airflow.cfg[celery] 节点中的变量的 celery_ 前缀已被删除。