Celery 一项任务启动许多小任务的最佳实践

Celery Best Practice For one Task Kicking off Many Small tasks

我对芹菜的经验是新手。我写过很多任务,既有预定的也有延迟的,但仅此而已。

我 运行 遇到一个问题,我想创建一个任务来启动 1000 个较小的作业,以缓解队列长度和可能需要数小时才能完成的作业可能出现的任何问题。

当前应用程序依赖于来自外部 API 的信息。可以这么说,用户将他们的帐户与我集成的另一项服务相关联,我想每天根据他们的外部帐户的变化来更新用户的信息。

我有这样的预定工作

@app.task() 
def refresh_accounts():
    for account in Account.objects.all():
        response = retrieve_account_info(account_id=account.id)
        account.data = response.data 
        account.save() 

--

我想要的是这样的

@app.task()
def kickoff_refresh():
    for account in Account.objects.all()
        refresh_account.delay(account_id=account.id)

@app.task() 
def refresh_account(account_id=None):
    account = Account.objects.get(id=account_id)
    response = retrieve_account_info(account_id=account.id)
    account.data = response.data 
    account.save()

我想到的一种方法是将 kickoff_refreshrefresh_account 放在不同的队列中。 @app.task(queue=q1), @app.task(queue=q2)... 但是,我不知道是否有更好的方法。在同一队列的任务中调用任务在 celery 中似乎是不好的做法 - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks 任务 kickoff_refresh 将是每隔几个小时的周期性任务 运行。

我很想听听对其他人有用的东西。谢谢

from celery import group


@app.task()
def kickoff_refresh(account_id=None):
    job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()

@app.task()
def refresh_account(account_id=None):
    account = Account.objects.get(id=account_id)
    response = retrieve_account_info(account_id=account.id)
    account.data = response.data 
    account.save()