Django Celery IntegrityError

Django Celery IntegrityError

我想为我的项目创建一个进度条。我有一个 class 而这个 class 有一些功能。特别是,其中一个需要很长时间(def download_all),这是我想要创建进度条的主要原因。

我成功设置了 celery、celery-progress 等,它们工作正常。我的问题是:我想将进度条集成到 download_all 函数中。我

它给出一个错误:IntegrityError at /o.../k... NOT NULL 约束失败:django_celery_results_taskresult.task_id

我该如何解决? functions.py

class myClass():

    def __init__(self, n_user, n_password, n_url, n_port, db_password, username):
        ...
        self.download_all(n_user, n_password, n_url, n_port, db_password)
        ...

    @shared_task(bind=True, name="my_add")
    def download_all(n_user, n_password, n_url, n_port, db_password)
     ...
     len_scans = len(scans)
     progress_recorder = ProgressRecorder(self)
     for s in scans:
         i = 0
         progress_recorder.set_progress(i + 1, len_scans)
         i += 1

views.py

def setup_wizard(request):
  ...
  functions.Zafiyet(setup.n_username, setup.n_password,
                                          setup.n_url, setup.n_port, setup.db_password,
                                          username=request.user.username)  

回溯

Environment:


Request Method: POST
Request URL: http://127.0.0.1:8000/operasyonmerkezi/konfigurasyon

Django Version: 3.2.7
Python Version: 3.9.6
Installed Applications:
['django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'dashboard',
 'accounts',
 'logs',
 'crispy_forms',
 'django_apscheduler',
 'easy_timezones',
 'django_celery_results',
 'celery_progress']
Installed Middleware:
['django.middleware.security.SecurityMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'django.middleware.csrf.CsrfViewMiddleware',
 'django.contrib.auth.middleware.AuthenticationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware']



Traceback (most recent call last):
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 581, in get_or_create
    return self.get(**kwargs), False
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 435, in get
    raise self.model.DoesNotExist(

During handling of the above exception (TaskResult matching query does not exist.), another exception occurred:
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
    return Database.Cursor.execute(self, query, params)

The above exception (NOT NULL constraint failed: django_celery_results_taskresult.task_id) was the direct cause of the following exception:
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\exception.py", line 47, in inner
    response = get_response(request)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\base.py", line 181, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\contrib\auth\decorators.py", line 21, in _wrapped_view
    return view_func(request, *args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\dashboard\views.py", line 128, in setup_wizard
    task = (functions.myClass(setup.n_username, setup.n_password,
  File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 44, in __init__
    self.download_all(n_user, n_password, n_url, n_port, db_password)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\local.py", line 188, in __call__
    return self._get_current_object()(*a, **kw)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 389, in __call__
    return self.run(*args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 153, in download_all
    progress_recorder.set_progress(i + 1, len_scans)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery_progress\backend.py", line 46, in set_progress
    self.task.update_state(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 971, in update_state
    self.backend.store_result(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\backends\base.py", line 482, in store_result
    self._store_result(task_id, result, state, traceback,
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\backends\database.py", line 66, in _store_result
    self.TaskModel._default_manager.store_result(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 46, in _inner
    return fun(*args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 168, in store_result
    obj, created = self.using(using).get_or_create(task_id=task_id,
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 588, in get_or_create
    return self.create(**params), True
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 453, in create
    obj.save(force_insert=True, using=self.db)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 726, in save
    self.save_base(using=using, force_insert=force_insert,
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 763, in save_base
    updated = self._save_table(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 868, in _save_table
    results = self._do_insert(cls._base_manager, using, fields, returning_fields, raw)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 906, in _do_insert
    return manager._insert(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\manager.py", line 85, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 1270, in _insert
    return query.get_compiler(using=using).execute_sql(returning_fields)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\sql\compiler.py", line 1416, in execute_sql
    cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 98, in execute
    return super().execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 66, in execute
    return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 75, in _execute_with_wrappers
    return executor(sql, params, many, context)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\utils.py", line 90, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
    return Database.Cursor.execute(self, query, params)

Exception Type: IntegrityError at /operasyonmerkezi/konfigurasyon
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id

这似乎来自 Django 模型:“self.model.DoesNotExist”。

它不是来自芹菜,而是来自Model.objects

你最好检查一下你的 ORM。

不使用self.download_all(n_user, n_password, n_url, n_port, db_password)

但在 celery 任务中使用它 self.download_all.delay(n_user, n_password, n_url, n_port, db_password) 到 运行 函数

您可以使用 websocket 并将进度数据发送到 html 并使用 javascript

更新进度条

每次更新进度条都会移动

如果您想查看任务进度,您有 3 个选项

  1. 将当前进度存储为数据库中的模型字段
  2. 在 redis 数据库中存储为键值
  3. 在运行时通过过滤和状态改变的数据库中的元素计数

前端可以通过普通RESTAPI或者websocket得到这个结果。

REST 的最低配置示例 API

model.py

class MyLongProcess(models.Model):
    active_uuid = models.UUIDField('Active process', null=True, blank=True)
    name = models.CharField('Name', max_length=255)
    current_step = models.IntegerField('Current step', default=0)
    total = models.IntegerField('Total', default=0)

    @property
    def percentage_sending(self):
        # or it can be computed by filtering elements processed in celery with complete status
        return int((current_step / total) * 100) 

views.py

def initiate_execute_long_task(request, name):
    process = MyLongProcess.objects.create(active_uuid=uuid.uuid4(), name=name, total=BIG_COUNTED_VALUE)
    async_execute_long_task.delay(process)
    return HttpResponse()


from app.celery import app
@app.task
def async_execute_long_task(process)
    for i in range process.total:
        do_some_staff()
        process.current_step += 1
        process.save()
               

detail.html

<div class="progress-bar" role="progressbar"
  style="width: {{ my_model_object.percentage_sending }}%;"
  aria-valuenow="{{ my_model_object.percentage_sending }}"
  aria-valuemin="0" aria-valuemax="100">{{ my_model_object.percentage_sending }}%
</div>

首先,您可能不想在这样的方法上使用 @shared_task 装饰器。

因此,您的方法签名有些损坏。 bind=True 参数将导致 celery 将任务对象传递到调用中,这可能会根据您的签名产生意外结果。

证明我的意思:

@shared_task(bind=True, name='mytask')
def my_task(the_task, a, b):
    print(the_task)  # a celery Task object
    return a + b
# OR
@shared_task(name='mytask')
def my_task(a, b):
    return a + b

请注意,这些是普通函数,而不是 class 中的方法。

现在,结合 classes 上的方法隐式接收 self,class 的实例作为第一个参数这一事实。您的签名中也缺少此内容。老实说,它完全按照编写的方式执行,这真是令人惊讶……根据您提供的代码,我本以为它会失败并显示 TypeError

相反,您应该将任务提取到它自己的函数中,或者首先在方法上添加 @staticmethod 装饰器:

class Foo:
    @shared_task(name='mytask')
    @staticmethod
    def bar(a, b):
        ...

回溯您的堆栈跟踪,您的错误来自任务结果中的完整性错误 table:

Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id

发生这种情况是因为您的应用出于某种原因尝试重新使用任务 ID 并尝试存储已执行并存储在数据库中的结果。这可能是由于上述问题、您的应用程序中错误的并发模型或导致意外重复的 database/queue 的带外更改引起的。您需要将有问题的任务从消息队列中清除或从数据库中删除有问题的结果,但可能无法保证这种情况不会再次发生,除非您使用应用程序代码修复潜在问题。