Django Celery IntegrityError
Django Celery IntegrityError
我想为我的项目创建一个进度条。我有一个 class 而这个 class 有一些功能。特别是,其中一个需要很长时间(def download_all
),这是我想要创建进度条的主要原因。
我成功设置了 celery、celery-progress 等,它们工作正常。我的问题是:我想将进度条集成到 download_all
函数中。我
它给出一个错误:IntegrityError at /o.../k...
NOT NULL 约束失败:django_celery_results_taskresult.task_id
我该如何解决?
functions.py
class myClass():
def __init__(self, n_user, n_password, n_url, n_port, db_password, username):
...
self.download_all(n_user, n_password, n_url, n_port, db_password)
...
@shared_task(bind=True, name="my_add")
def download_all(n_user, n_password, n_url, n_port, db_password)
...
len_scans = len(scans)
progress_recorder = ProgressRecorder(self)
for s in scans:
i = 0
progress_recorder.set_progress(i + 1, len_scans)
i += 1
views.py
def setup_wizard(request):
...
functions.Zafiyet(setup.n_username, setup.n_password,
setup.n_url, setup.n_port, setup.db_password,
username=request.user.username)
回溯
Environment:
Request Method: POST
Request URL: http://127.0.0.1:8000/operasyonmerkezi/konfigurasyon
Django Version: 3.2.7
Python Version: 3.9.6
Installed Applications:
['django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'dashboard',
'accounts',
'logs',
'crispy_forms',
'django_apscheduler',
'easy_timezones',
'django_celery_results',
'celery_progress']
Installed Middleware:
['django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware']
Traceback (most recent call last):
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 581, in get_or_create
return self.get(**kwargs), False
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 435, in get
raise self.model.DoesNotExist(
During handling of the above exception (TaskResult matching query does not exist.), another exception occurred:
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
return Database.Cursor.execute(self, query, params)
The above exception (NOT NULL constraint failed: django_celery_results_taskresult.task_id) was the direct cause of the following exception:
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\exception.py", line 47, in inner
response = get_response(request)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\base.py", line 181, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\contrib\auth\decorators.py", line 21, in _wrapped_view
return view_func(request, *args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\views.py", line 128, in setup_wizard
task = (functions.myClass(setup.n_username, setup.n_password,
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 44, in __init__
self.download_all(n_user, n_password, n_url, n_port, db_password)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\local.py", line 188, in __call__
return self._get_current_object()(*a, **kw)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 389, in __call__
return self.run(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 153, in download_all
progress_recorder.set_progress(i + 1, len_scans)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery_progress\backend.py", line 46, in set_progress
self.task.update_state(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 971, in update_state
self.backend.store_result(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\backends\base.py", line 482, in store_result
self._store_result(task_id, result, state, traceback,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\backends\database.py", line 66, in _store_result
self.TaskModel._default_manager.store_result(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 46, in _inner
return fun(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 168, in store_result
obj, created = self.using(using).get_or_create(task_id=task_id,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 588, in get_or_create
return self.create(**params), True
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 453, in create
obj.save(force_insert=True, using=self.db)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 726, in save
self.save_base(using=using, force_insert=force_insert,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 763, in save_base
updated = self._save_table(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 868, in _save_table
results = self._do_insert(cls._base_manager, using, fields, returning_fields, raw)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 906, in _do_insert
return manager._insert(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\manager.py", line 85, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 1270, in _insert
return query.get_compiler(using=using).execute_sql(returning_fields)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\sql\compiler.py", line 1416, in execute_sql
cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 98, in execute
return super().execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 66, in execute
return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 75, in _execute_with_wrappers
return executor(sql, params, many, context)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\utils.py", line 90, in __exit__
raise dj_exc_value.with_traceback(traceback) from exc_value
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
return Database.Cursor.execute(self, query, params)
Exception Type: IntegrityError at /operasyonmerkezi/konfigurasyon
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id
这似乎来自 Django 模型:“self.model.DoesNotExist”。
它不是来自芹菜,而是来自Model.objects
。
你最好检查一下你的 ORM。
不使用self.download_all(n_user, n_password, n_url, n_port, db_password)
但在 celery 任务中使用它 self.download_all.delay(n_user, n_password, n_url, n_port, db_password)
到 运行 函数
您可以使用 websocket
并将进度数据发送到 html 并使用 javascript
更新进度条
每次更新进度条都会移动
如果您想查看任务进度,您有 3 个选项
- 将当前进度存储为数据库中的模型字段
- 在 redis 数据库中存储为键值
- 在运行时通过过滤和状态改变的数据库中的元素计数
前端可以通过普通RESTAPI或者websocket得到这个结果。
REST 的最低配置示例 API
model.py
class MyLongProcess(models.Model):
active_uuid = models.UUIDField('Active process', null=True, blank=True)
name = models.CharField('Name', max_length=255)
current_step = models.IntegerField('Current step', default=0)
total = models.IntegerField('Total', default=0)
@property
def percentage_sending(self):
# or it can be computed by filtering elements processed in celery with complete status
return int((current_step / total) * 100)
views.py
def initiate_execute_long_task(request, name):
process = MyLongProcess.objects.create(active_uuid=uuid.uuid4(), name=name, total=BIG_COUNTED_VALUE)
async_execute_long_task.delay(process)
return HttpResponse()
from app.celery import app
@app.task
def async_execute_long_task(process)
for i in range process.total:
do_some_staff()
process.current_step += 1
process.save()
detail.html
<div class="progress-bar" role="progressbar"
style="width: {{ my_model_object.percentage_sending }}%;"
aria-valuenow="{{ my_model_object.percentage_sending }}"
aria-valuemin="0" aria-valuemax="100">{{ my_model_object.percentage_sending }}%
</div>
首先,您可能不想在这样的方法上使用 @shared_task
装饰器。
因此,您的方法签名有些损坏。 bind=True
参数将导致 celery 将任务对象传递到调用中,这可能会根据您的签名产生意外结果。
证明我的意思:
@shared_task(bind=True, name='mytask')
def my_task(the_task, a, b):
print(the_task) # a celery Task object
return a + b
# OR
@shared_task(name='mytask')
def my_task(a, b):
return a + b
请注意,这些是普通函数,而不是 class 中的方法。
现在,结合 classes 上的方法隐式接收 self
,class 的实例作为第一个参数这一事实。您的签名中也缺少此内容。老实说,它完全按照编写的方式执行,这真是令人惊讶……根据您提供的代码,我本以为它会失败并显示 TypeError
。
相反,您应该将任务提取到它自己的函数中,或者首先在方法上添加 @staticmethod
装饰器:
class Foo:
@shared_task(name='mytask')
@staticmethod
def bar(a, b):
...
回溯您的堆栈跟踪,您的错误来自任务结果中的完整性错误 table:
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id
发生这种情况是因为您的应用出于某种原因尝试重新使用任务 ID 并尝试存储已执行并存储在数据库中的结果。这可能是由于上述问题、您的应用程序中错误的并发模型或导致意外重复的 database/queue 的带外更改引起的。您需要将有问题的任务从消息队列中清除或从数据库中删除有问题的结果,但可能无法保证这种情况不会再次发生,除非您使用应用程序代码修复潜在问题。
我想为我的项目创建一个进度条。我有一个 class 而这个 class 有一些功能。特别是,其中一个需要很长时间(def download_all
),这是我想要创建进度条的主要原因。
我成功设置了 celery、celery-progress 等,它们工作正常。我的问题是:我想将进度条集成到 download_all
函数中。我
它给出一个错误:IntegrityError at /o.../k... NOT NULL 约束失败:django_celery_results_taskresult.task_id
我该如何解决? functions.py
class myClass():
def __init__(self, n_user, n_password, n_url, n_port, db_password, username):
...
self.download_all(n_user, n_password, n_url, n_port, db_password)
...
@shared_task(bind=True, name="my_add")
def download_all(n_user, n_password, n_url, n_port, db_password)
...
len_scans = len(scans)
progress_recorder = ProgressRecorder(self)
for s in scans:
i = 0
progress_recorder.set_progress(i + 1, len_scans)
i += 1
views.py
def setup_wizard(request):
...
functions.Zafiyet(setup.n_username, setup.n_password,
setup.n_url, setup.n_port, setup.db_password,
username=request.user.username)
回溯
Environment:
Request Method: POST
Request URL: http://127.0.0.1:8000/operasyonmerkezi/konfigurasyon
Django Version: 3.2.7
Python Version: 3.9.6
Installed Applications:
['django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'dashboard',
'accounts',
'logs',
'crispy_forms',
'django_apscheduler',
'easy_timezones',
'django_celery_results',
'celery_progress']
Installed Middleware:
['django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware']
Traceback (most recent call last):
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 581, in get_or_create
return self.get(**kwargs), False
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 435, in get
raise self.model.DoesNotExist(
During handling of the above exception (TaskResult matching query does not exist.), another exception occurred:
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
return Database.Cursor.execute(self, query, params)
The above exception (NOT NULL constraint failed: django_celery_results_taskresult.task_id) was the direct cause of the following exception:
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\exception.py", line 47, in inner
response = get_response(request)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\base.py", line 181, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\contrib\auth\decorators.py", line 21, in _wrapped_view
return view_func(request, *args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\views.py", line 128, in setup_wizard
task = (functions.myClass(setup.n_username, setup.n_password,
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 44, in __init__
self.download_all(n_user, n_password, n_url, n_port, db_password)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\local.py", line 188, in __call__
return self._get_current_object()(*a, **kw)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 389, in __call__
return self.run(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 153, in download_all
progress_recorder.set_progress(i + 1, len_scans)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery_progress\backend.py", line 46, in set_progress
self.task.update_state(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 971, in update_state
self.backend.store_result(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\backends\base.py", line 482, in store_result
self._store_result(task_id, result, state, traceback,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\backends\database.py", line 66, in _store_result
self.TaskModel._default_manager.store_result(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 46, in _inner
return fun(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 168, in store_result
obj, created = self.using(using).get_or_create(task_id=task_id,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 588, in get_or_create
return self.create(**params), True
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 453, in create
obj.save(force_insert=True, using=self.db)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 726, in save
self.save_base(using=using, force_insert=force_insert,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 763, in save_base
updated = self._save_table(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 868, in _save_table
results = self._do_insert(cls._base_manager, using, fields, returning_fields, raw)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 906, in _do_insert
return manager._insert(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\manager.py", line 85, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 1270, in _insert
return query.get_compiler(using=using).execute_sql(returning_fields)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\sql\compiler.py", line 1416, in execute_sql
cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 98, in execute
return super().execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 66, in execute
return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 75, in _execute_with_wrappers
return executor(sql, params, many, context)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\utils.py", line 90, in __exit__
raise dj_exc_value.with_traceback(traceback) from exc_value
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
return Database.Cursor.execute(self, query, params)
Exception Type: IntegrityError at /operasyonmerkezi/konfigurasyon
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id
这似乎来自 Django 模型:“self.model.DoesNotExist”。
它不是来自芹菜,而是来自Model.objects
。
你最好检查一下你的 ORM。
不使用self.download_all(n_user, n_password, n_url, n_port, db_password)
但在 celery 任务中使用它 self.download_all.delay(n_user, n_password, n_url, n_port, db_password)
到 运行 函数
您可以使用 websocket
并将进度数据发送到 html 并使用 javascript
每次更新进度条都会移动
如果您想查看任务进度,您有 3 个选项
- 将当前进度存储为数据库中的模型字段
- 在 redis 数据库中存储为键值
- 在运行时通过过滤和状态改变的数据库中的元素计数
前端可以通过普通RESTAPI或者websocket得到这个结果。
REST 的最低配置示例 API
model.py
class MyLongProcess(models.Model):
active_uuid = models.UUIDField('Active process', null=True, blank=True)
name = models.CharField('Name', max_length=255)
current_step = models.IntegerField('Current step', default=0)
total = models.IntegerField('Total', default=0)
@property
def percentage_sending(self):
# or it can be computed by filtering elements processed in celery with complete status
return int((current_step / total) * 100)
views.py
def initiate_execute_long_task(request, name):
process = MyLongProcess.objects.create(active_uuid=uuid.uuid4(), name=name, total=BIG_COUNTED_VALUE)
async_execute_long_task.delay(process)
return HttpResponse()
from app.celery import app
@app.task
def async_execute_long_task(process)
for i in range process.total:
do_some_staff()
process.current_step += 1
process.save()
detail.html
<div class="progress-bar" role="progressbar"
style="width: {{ my_model_object.percentage_sending }}%;"
aria-valuenow="{{ my_model_object.percentage_sending }}"
aria-valuemin="0" aria-valuemax="100">{{ my_model_object.percentage_sending }}%
</div>
首先,您可能不想在这样的方法上使用 @shared_task
装饰器。
因此,您的方法签名有些损坏。 bind=True
参数将导致 celery 将任务对象传递到调用中,这可能会根据您的签名产生意外结果。
证明我的意思:
@shared_task(bind=True, name='mytask')
def my_task(the_task, a, b):
print(the_task) # a celery Task object
return a + b
# OR
@shared_task(name='mytask')
def my_task(a, b):
return a + b
请注意,这些是普通函数,而不是 class 中的方法。
现在,结合 classes 上的方法隐式接收 self
,class 的实例作为第一个参数这一事实。您的签名中也缺少此内容。老实说,它完全按照编写的方式执行,这真是令人惊讶……根据您提供的代码,我本以为它会失败并显示 TypeError
。
相反,您应该将任务提取到它自己的函数中,或者首先在方法上添加 @staticmethod
装饰器:
class Foo:
@shared_task(name='mytask')
@staticmethod
def bar(a, b):
...
回溯您的堆栈跟踪,您的错误来自任务结果中的完整性错误 table:
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id
发生这种情况是因为您的应用出于某种原因尝试重新使用任务 ID 并尝试存储已执行并存储在数据库中的结果。这可能是由于上述问题、您的应用程序中错误的并发模型或导致意外重复的 database/queue 的带外更改引起的。您需要将有问题的任务从消息队列中清除或从数据库中删除有问题的结果,但可能无法保证这种情况不会再次发生,除非您使用应用程序代码修复潜在问题。