使用芹菜在Django中非周期性重复任务

Non-periodic repetition of task in Django using celery

我需要 运行 一个随机时间间隔的任务 "CreateNotifications"。 这是我在 CELERY 的设置中尝试做的事情。

t = random.randint(45, 85)
## print "time = ", t

## celery app configuration
app.conf.CELERYBEAT_SCHEDULE = {
    # Executes at every 't' interval, where t is random
    'create-notifications': {
        'task': 'apps.notifications.tasks.CreateNotifications',
        'schedule': timedelta(seconds=t),
    },
}

现在的问题是 CELERY 的这些设置只执行一次(当我 运行 命令 python manage.py 运行 服务器时),因此变量 't' 因此 timedelta 中 'seconds' 的值得到一个随机值,但只有一次。

最终使上述过程成为一个周期性的过程,固定周期为X秒,当我启动服务器时随机选择X。

或者,我已经尝试 运行 执行单个任务,并在其中使用一个随机延迟的无休止 while 循环,以便 celery 仅自动检测一个任务,并且该任务永远不会结束。我的目的是通过 while 循环中的随机延迟来解决的。 像这样(注 -> 'while' 在函数 CreateNotifications() 内)

@app.task
def CreateNotifications():

while True:

    upper_limit = models.MyUser.objects.all().aggregate(Max('id'))
    lower_limit = models.MyUser.objects.all().aggregate(Min('id'))

    ## select a user to be notified randomly

    to_user = None
    to = 0

    while to_user is None:
        to = random.randint(lower_limit['id__min'], upper_limit['id__max'])
        try:
            to_user = models.MyUser.objects.get(id=to)
        except:
            pass

    ## select a user to be notified from randomly

    frm_user = None
    frm = to

    while frm_user is None:
        while frm == to:
            frm = random.randint(lower_limit['id__min'], upper_limit['id__max'])
        try:
            frm_user = models.MyUser.objects.get(id=frm)
        except:
            pass

    notif_type = ['comment on', 'liked', 'shared']
    notif_media = ['post', 'picture', 'video']

    models.Notification.objects.create(
        notified_user = to_user,
        notifier = frm_user,
        notification_type = random.choice(notif_type),
        notification_media = random.choice(notif_media))

    to_user.new_notification_count += 1
    to_user.save()


    t = random.randint(35, 55)
    print "delay = ", t
    time.sleep(t)

它完全按照我的意愿做事,但现在有 4 个不同的工作人员执行相同的任务,但我只想要一个。

我已经尝试更改位于我的 virtualenv/bin/ 目录中的 celeryd 文件,如此处所示 -> Celery. Decrease number of processes

因为 /etc/defaults/ 中没有 celeryd 文件,但仍然没有成功

我们将不胜感激。

你真的不应该在 Celery 任务中放入无限循环。如果您需要一个专门的进程,最好 运行 它自己而不是通过 Celery。

您以后可以在任务 运行 时重新安排新任务。类似于以下内容:

@app.task
def create_notifications():
     try:
          #
     finally:
          t = random.randint(45, 85)
          create_notifications.apply_async(countdown=t)

另一种选择是让调度程序任务 运行 定期执行,但在不久的将来随机时间将通知任务排队。例如,如果调度程序任务 运行s 每 45 秒。

@app.task
def schedule_notifications():
     t = random.randint(0, 40)
     create_notifications.apply_async(countdown=t)