Celery 任务 运行 每 20 秒重叠并在最后一个完成之前开始

Celery tasks running every 20 seconds is overlapping and starting before the last can finish

我有一个 celery 任务 运行 每 20 秒一次,跨 3 个实例都连接到一个数据库。问题是处理程序有时会两次触发任务重叠。似乎在任务重叠时过滤的项目没有更新:

@periodic_task(run_every=timedelta(seconds=20))
def process_webhook_transactions():
    """Process webhook transactions"""
    transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
    for transaction in transactions:
        data = transaction.body
        event = data.get('event_category')
        if event is None:
            transaction.status = WebhookTransaction.ERROR
            transaction.save()
            continue
        
        
        handler = WEBHOOK_HANDLERS.get(event, default_handler)
        success = handler(data)

        if success:
            transaction.status = WebhookTransaction.PROCESSED
        else:
            transaction.status = WebhookTransaction.ERROR
        transaction.save()

避免这种情况的最佳方法是什么?

您可以使用 select_for_updateskip_locked 来防止 3 个工作人员同时 运行 该任务时出现重复的行。像这样:

transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
transactions = transactions.select_for_update(skip_locked=True, of=("self",))

但这种方法会使一个工作实例比其他工作实例更努力地工作(第一个任务选择了所有事务,而其他任务则没有多少事务)。您可以在 20 秒内创建一个同样 运行 的新任务,并且该任务会将所有事务拆分为更小的块(可能是 10-20?),然后用这些块触发 process_webhook_transactions

如果 handler = WEBHOOK_HANDLERS.get(event, default_handler) 是异步的,我认为拆分块的方法也很好,因为您可以 运行 它并发以提高任务的速度。