芹菜任务在后续任务中分配本地实例属性

celery task is assigning local instance attributes in subsequent tasks

我有一个任务,每次处理新任务时都会创建一个任务历史记录。我在任务函数的开头实例化了一个新的 TaskHistory 实例。出于某种原因,前一个任务中分配的 TaskHistory class 属性被分配给后续任务中的 TaskHistory class 属性。例如,如果任务成功,我分配:

 task_history.meta['success'] = 'Successfully processed {} rows '.format(row_count)'

在后续任务中它可能会失败,我分配:

task_history.meta['error'] = 'Processed {} rows '.format(row_count) + str(e)

后续任务应该只分配 meta['error'],但即使 task_history 已重新实例化,它也会分配先前的 meta['success'] 值。

下面是调用任务的代码:

args = [file_ids]
kwargs = {'requester': request.user.profile}
csv_import.apply_async(args=args, kwargs=kwargs)

下面是任务函数:

@task
def csv_import(file_ids, requester=None):
    task_history = TaskHistory()
    task_history.requester = requester
    task_history.status = 'pending'
    task_history.started = timezone.now()
    task_history.save()
    row_count = 0
    try:

       //main logic goes here

       task_history.status = 'completed'
       task_history.completed = now()
       task_history.meta['success'] = 'Successfully processed {} rows '.format(row_count)
       task_history.save()
    except Exception as e:
        task_history.status = 'failed'
        task_history.completed = timezone.now()
        task_history.meta['error'] = 'Processed {} rows '.format(row_count) + str(e)
        task_history.save()
        raise Exception

最后这与芹菜无关,而是与我在模型中为元字段设置默认值的方式有关。元字段是 JSONfield() 我将默认值设置为 default={} 它创建了一个可变实例并在所有实例之间共享的JSONfield。我通过将它设置为可调用的 dict 来解决这个问题。新字段现在显示如下:

meta = JSONField(default=dict)

可以在此处找到更多相关信息 https://docs.djangoproject.com/es/1.9/ref/contrib/postgres/fields/#jsonfield。这是应用于所有模型默认值的相同标准。