Flask + Celery + Redis: consumer: 无法连接到 amqp://guest:**@127.0.0.1:5672//: 超时
Flask + Celery + Redis: consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: timed out
我设置了一个简单的 celery 任务。为了 运行 它,我首先启动了 redis-server,然后激活虚拟环境并输入 "celery beat",打开一个新终端 window 进入虚拟环境并输入 "celery worker"
Flask==1.0.2
celery==4.2.1
requests==2.19
这是之后的错误信息:
consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: timed
out.
这是执行'celery beat':
后显示的配置详情
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.default.Loader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
flask-proj/app/__init__.py
from flask import Flask, request, jsonify
from celery import Celery
import celeryconfig
app = Flask(__name__)
app.config.from_object('config')
def make_celery(app):
# create context tasks in celery
celery = Celery(
app.import_name,
broker=app.config['BROKER_URL']
)
celery.conf.update(app.config)
celery.config_from_object(celeryconfig)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
@app.route("/")
def hello():
return "Hello World!"
flask-proj/tasks/test.py
import celery
@celery.task()
def print_hello():
logger = print_hello.get_logger()
logger.info("Hello")
flask-proj/config.py
import os
REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 BROKER_URL = environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST, port=str(REDIS_PORT))) CELERY_RESULT_BACKEND = BROKER_URL
flask-proj/celeryconfig.py
from celery.schedules import crontab
CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYBEAT_SCHEDULE = {
'test-celery': {
'task': 'app.tasks.test.print_hello',
# Every minute
'schedule': crontab(minute="*"),
}
}
如果我需要提供其他详细信息,请告诉我。
amqp 是 rabbitmq 而不是 redis。
Redis 通常是
redis://:password@hostname:port/db_number
我会手动配置以查看它是否有效。
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
从 make_celery()
函数中删除 celery.conf.update(app.config)
行,因此它会像,
def make_celery(app):
# create context tasks in celery
celery = Celery(
app.import_name,
broker=app.config['BROKER_URL']
)
<strike>celery.conf.update(app.config)</strike><b> # remove this line.</b>
celery.config_from_object(celeryconfig)
TaskBase = celery.Task
并且,
将 flask-proj/config.py
的粘贴内容复制到 flask-proj/celeryconfig.py
。
因此他flask-proj/celeryconfig.py
会像,
from celery.schedules import crontab
<b>import os
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
BROKER_URL = os.environ.get(
'REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST, port=str(REDIS_PORT)))
CELERY_RESULT_BACKEND = BROKER_URL</b>
CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYBEAT_SCHEDULE = {
'test-celery': {
'task': 'app.tasks.test.print_hello',
# Every minute
'schedule': crontab(minute="*"),
}
}
在 Django 中有同样的问题,但我的问题原来是在 settings.py 中使用 "BROKER_URL" 而不是 "CELERY_BROKER_URL"。 Celery 没有找到 URL 并且默认使用 rabbitmq 端口而不是 redis 端口。
我设置了一个简单的 celery 任务。为了 运行 它,我首先启动了 redis-server,然后激活虚拟环境并输入 "celery beat",打开一个新终端 window 进入虚拟环境并输入 "celery worker"
Flask==1.0.2
celery==4.2.1
requests==2.19
这是之后的错误信息:
consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: timed out.
这是执行'celery beat':
后显示的配置详情Configuration -> . broker -> amqp://guest:**@localhost:5672// . loader -> celery.loaders.default.Loader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%WARNING . maxinterval -> 5.00 minutes (300s)
flask-proj/app/__init__.py
from flask import Flask, request, jsonify
from celery import Celery
import celeryconfig
app = Flask(__name__)
app.config.from_object('config')
def make_celery(app):
# create context tasks in celery
celery = Celery(
app.import_name,
broker=app.config['BROKER_URL']
)
celery.conf.update(app.config)
celery.config_from_object(celeryconfig)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
@app.route("/")
def hello():
return "Hello World!"
flask-proj/tasks/test.py
import celery
@celery.task()
def print_hello():
logger = print_hello.get_logger()
logger.info("Hello")
flask-proj/config.py
import os
REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 BROKER_URL = environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST, port=str(REDIS_PORT))) CELERY_RESULT_BACKEND = BROKER_URL
flask-proj/celeryconfig.py
from celery.schedules import crontab
CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYBEAT_SCHEDULE = {
'test-celery': {
'task': 'app.tasks.test.print_hello',
# Every minute
'schedule': crontab(minute="*"),
}
}
如果我需要提供其他详细信息,请告诉我。
amqp 是 rabbitmq 而不是 redis。
Redis 通常是
redis://:password@hostname:port/db_number
我会手动配置以查看它是否有效。
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
从 make_celery()
函数中删除 celery.conf.update(app.config)
行,因此它会像,
def make_celery(app):
# create context tasks in celery
celery = Celery(
app.import_name,
broker=app.config['BROKER_URL']
)
<strike>celery.conf.update(app.config)</strike><b> # remove this line.</b>
celery.config_from_object(celeryconfig)
TaskBase = celery.Task
并且,
将 flask-proj/config.py
的粘贴内容复制到 flask-proj/celeryconfig.py
。
因此他flask-proj/celeryconfig.py
会像,
from celery.schedules import crontab
<b>import os
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
BROKER_URL = os.environ.get(
'REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST, port=str(REDIS_PORT)))
CELERY_RESULT_BACKEND = BROKER_URL</b>
CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYBEAT_SCHEDULE = {
'test-celery': {
'task': 'app.tasks.test.print_hello',
# Every minute
'schedule': crontab(minute="*"),
}
}
在 Django 中有同样的问题,但我的问题原来是在 settings.py 中使用 "BROKER_URL" 而不是 "CELERY_BROKER_URL"。 Celery 没有找到 URL 并且默认使用 rabbitmq 端口而不是 redis 端口。