为什么芹菜重试,但我的工作没有失败?

Why does celery retry, but my job did not fail?

我有一个到 运行 MySQL 数据库的 celery 作业,但是,它总是 Lock Wait Timeout。在深入研究数据库查询后,我意识到芹菜在 1800 秒后触发了另一项工作,并导致我的数据库出现问题。我不知道为什么 – 我的工作还没有失败!

@celery.task(bind=True, acks_late=True)
def etl_pipeline(dev=dev, test=test):

我可以看出 MySQL 再次收到相同的查询,可能是 Celery 触发了相同的作业。为什么这里我重试了,默认重试是180秒(3分钟)。

这是官方文档:

default_retry_delay = 180

Default time in seconds before a retry of the task should be executed. 3 minutes by default.

但我的情况是 1800 秒

此外,我的经纪人收到了一些其他警告,我不确定这是否相关:

The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.

配置 RabbitMq

RABBITMQ_SERVER = 'amqp://{}:{}@{}'.format(
    os.getenv('RABBITMQ_USER'),
    os.getenv('RABBITMQ_PASS'),
    os.getenv('RABBITMQ_HOST')
)
broker_url = '{}/{}'.format(
    RABBITMQ_SERVER,
    os.getenv('RABBITMQ_VHOST'),
)
backend = 'amqp'

我该如何解决这个问题?谢谢!

Celery: 4.2.0

I am using job = chain(single_job), but i only have one single_job job() starting the job.

mysql> show processlist;
+-------+------+---------------+------------------+---------+------+-----------+
| Id    | User | Host          | db               | Command | Time | State     |
+-------+------+---------------+------------------+---------+------+-----------+
| 97189 | clp  | 172.11.17.202 | bain_ai_database | Query   |    0 | init      |
| 97488 | clp  | 172.11.11.252 | bain_ai_database | Query   | 1505 | executing |
| 97489 | clp  | 172.11.11.252 | bain_ai_database | Sleep   | 1851 |           |
| 97543 | clp  | 172.21.6.242  | bain_ai_database | Query   |   51 | updating  |
| 97544 | clp  | 172.21.6.242  | bain_ai_database | Sleep   |   51 |           |
+-------+------+---------------+------------------+---------+------+-----------+

根据您执行 sql 查询的方式,我会尝试以下方法。 (1) 因为你有 bind=True,任务应该是你函数的第一个参数。 celery 中的惯例是调用第一个参数 self。 (2) 您想尝试捕获正在发生的数据库级异常并忽略它。

from celery.utils.log import get_task_logger

log = get_task_logger(__name__)


@celery.task(bind=True, acks_late=True)
def etl_pipeline(self, dev=dev, test=test):
    try:
        # try querying the database here using sqlalchemy or mysqlconnect??
    except Exception as ex:
        # for now, log the exception and type so that you can drill down into what is happening
        log.info('[etl_pipeline]  exception of type %s.%s: %s', ex.__class__.__module__, ex.__class__.__name__, ex)
        raise       

您将从日志记录中获得的调试应该可以帮助您确定您在客户端遇到的错误。