设置 Postgres celery result_backend 时 Airflow Scheduler 崩溃
Airflow Scheduler Crashes when setting Postgres celery result_backend
我尝试使用 CeleryExecutor 实现 Apache Airflow。对于数据库,我使用 Postgres,对于 celery 消息队列,我使用 Redis。使用 LocalExecutor 时一切正常,但是当我在 airflow.cfg 中设置 CeleryExecutor 并想将 Postgres 数据库设置为 result_backend
result_backend = postgresql+psycopg2://airflow_user:*******@localhost/airflow
当 运行 Airflow 调度程序时,无论我触发哪个 DAG,我都会收到此错误:
[2020-03-18 14:14:13,341] {scheduler_job.py:1382} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'backend'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1380, in _execute
self._execute_helper()
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1441, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1503, in _validate_and_run_task_instances
self.executor.heartbeat()
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 205, in trigger_tasks
cached_celery_backend = tasks[0].backend
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/task.py", line 1037, in backend
return self.app.backend
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/base.py", line 1227, in backend
return self._get_backend()
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/base.py", line 944, in _get_backend
self.loader)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/backends.py", line 74, in by_url
return by_name(backend, loader), url
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/backends.py", line 60, in by_name
backend, 'is a Python module, not a backend class.'))
celery.exceptions.ImproperlyConfigured: Unknown result backend: 'postgresql'. Did you spell that correctly? ('is a Python module, not a backend class.')
指向数据库的完全相同的参数有效
sql_alchemy_conn = postgresql+psycopg2://airflow_user:*******@localhost/airflow
将Redis设置为celeryresult_backend可行,但我看这不是推荐的方式。
result_backend = redis://localhost:6379/0
有没有人看出我做错了什么?
您需要在数据库连接字符串中添加db+
前缀:
f"db+postgresql+psycopg2://{user}:{password}@{host}/{database}"
文档中也提到了这一点:https://docs.celeryproject.org/en/stable/userguide/configuration.html#database-url-examples
您需要在数据库连接字符串中添加db+
前缀:
result_backend = db+postgresql://airflow_user:*******@localhost/airflow
我尝试使用 CeleryExecutor 实现 Apache Airflow。对于数据库,我使用 Postgres,对于 celery 消息队列,我使用 Redis。使用 LocalExecutor 时一切正常,但是当我在 airflow.cfg 中设置 CeleryExecutor 并想将 Postgres 数据库设置为 result_backend
result_backend = postgresql+psycopg2://airflow_user:*******@localhost/airflow
当 运行 Airflow 调度程序时,无论我触发哪个 DAG,我都会收到此错误:
[2020-03-18 14:14:13,341] {scheduler_job.py:1382} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'backend'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1380, in _execute
self._execute_helper()
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1441, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1503, in _validate_and_run_task_instances
self.executor.heartbeat()
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 205, in trigger_tasks
cached_celery_backend = tasks[0].backend
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/task.py", line 1037, in backend
return self.app.backend
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/base.py", line 1227, in backend
return self._get_backend()
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/base.py", line 944, in _get_backend
self.loader)
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/backends.py", line 74, in by_url
return by_name(backend, loader), url
File "<PATH_TO_VIRTUALENV>/lib/python3.6/site-packages/celery/app/backends.py", line 60, in by_name
backend, 'is a Python module, not a backend class.'))
celery.exceptions.ImproperlyConfigured: Unknown result backend: 'postgresql'. Did you spell that correctly? ('is a Python module, not a backend class.')
指向数据库的完全相同的参数有效
sql_alchemy_conn = postgresql+psycopg2://airflow_user:*******@localhost/airflow
将Redis设置为celeryresult_backend可行,但我看这不是推荐的方式。
result_backend = redis://localhost:6379/0
有没有人看出我做错了什么?
您需要在数据库连接字符串中添加db+
前缀:
f"db+postgresql+psycopg2://{user}:{password}@{host}/{database}"
文档中也提到了这一点:https://docs.celeryproject.org/en/stable/userguide/configuration.html#database-url-examples
您需要在数据库连接字符串中添加db+
前缀:
result_backend = db+postgresql://airflow_user:*******@localhost/airflow