如何不使用 Django 信号和 Celery 任务进入无限递归
How not to go into infinite recursion with a Django signal and a Celery task
我正在为 Django 应用程序计费,每当任何值发生变化时,我都应该根据后台任务的预算生成 Pdf。
models.py:
from django_model_changes import ChangesMixin
class Budget(ChangesMixin, models.Model):
pdf = models.FileField(verbose_name="PDF", max_length=200, upload_to='pdf/budgets', null=True, blank=True)
tasks.py:
@shared_task(name='generate_budget_pdf_task')
def generate_budget_pdf_task(budget_id):
budget = get_object_or_404(app_models.Budget, id=id)
concepts = budget.concept_budget.all()
try:
file = app_pdf.PDF(None, budget, concepts)
pdf_file = file.generate_budget()
file_name = 'Pto_' + str(budget.brand.client.name) + '_' + str(budget.name) + '_' + str(budget.code) + ".pdf"
budget.pdf.save(file_name, ContentFile(pdf_file), save=True)
except Exception as e:
print("Error generate_budget_pdf_task")
print(e)
print(type(e))
signals.py:
def generate_budget_pdf_signal(sender, instance, created, **kwargs):
if 'pdf' not in instance.changes():
app_tasks.generate_budget_pdf_task.delay(instance.id)
post_save.connect(generate_budget_pdf_signal, sender=app_models.Budget)
但是任务更新
有谁能帮帮我吗?
提前致谢。
这是解决问题的代码
models.py:
class Budget(ChangesMixin, models.Model):
def save(self, *args, **kwargs):
try:
if self._generated_pdf == False:
self._generated_pdf = True
except:
self._generated_pdf = False
signals.py:
def generate_budget_pdf_signal(sender, instance, created, **kwargs):
if 'pdf' not in instance.changes():
app_tasks.generate_budget_pdf_task.delay(instance.id)
post_save.connect(generate_budget_pdf_signal, 发件人=app_models.Budget)
tasks.py:
@shared_task(name='generate_budget_pdf_task')
def generate_budget_pdf_task(budget_id):
budget = get_object_or_404(app_models.Budget, id=budget_id)
concepts = budget.concept_budget.all()
try:
file = app_pdf.PDF(None, budget, concepts)
pdf_file = file.generate_budget()
file_name = 'Pto_' + str(budget.brand.client.name) + '_' + str(budget.name) + '_' + str(budget.code) + ".pdf"
budget._generated_pdf = True
budget.pdf.save(file_name, ContentFile(pdf_file), save=True)
except Exception as e:
print("Error generate_budget_pdf_task")
print(e)
print(type(e))
我正在为 Django 应用程序计费,每当任何值发生变化时,我都应该根据后台任务的预算生成 Pdf。
models.py:
from django_model_changes import ChangesMixin
class Budget(ChangesMixin, models.Model):
pdf = models.FileField(verbose_name="PDF", max_length=200, upload_to='pdf/budgets', null=True, blank=True)
tasks.py:
@shared_task(name='generate_budget_pdf_task')
def generate_budget_pdf_task(budget_id):
budget = get_object_or_404(app_models.Budget, id=id)
concepts = budget.concept_budget.all()
try:
file = app_pdf.PDF(None, budget, concepts)
pdf_file = file.generate_budget()
file_name = 'Pto_' + str(budget.brand.client.name) + '_' + str(budget.name) + '_' + str(budget.code) + ".pdf"
budget.pdf.save(file_name, ContentFile(pdf_file), save=True)
except Exception as e:
print("Error generate_budget_pdf_task")
print(e)
print(type(e))
signals.py:
def generate_budget_pdf_signal(sender, instance, created, **kwargs):
if 'pdf' not in instance.changes():
app_tasks.generate_budget_pdf_task.delay(instance.id)
post_save.connect(generate_budget_pdf_signal, sender=app_models.Budget)
但是任务更新
有谁能帮帮我吗? 提前致谢。
这是解决问题的代码
models.py:
class Budget(ChangesMixin, models.Model):
def save(self, *args, **kwargs):
try:
if self._generated_pdf == False:
self._generated_pdf = True
except:
self._generated_pdf = False
signals.py:
def generate_budget_pdf_signal(sender, instance, created, **kwargs):
if 'pdf' not in instance.changes():
app_tasks.generate_budget_pdf_task.delay(instance.id)
post_save.connect(generate_budget_pdf_signal, 发件人=app_models.Budget)
tasks.py:
@shared_task(name='generate_budget_pdf_task')
def generate_budget_pdf_task(budget_id):
budget = get_object_or_404(app_models.Budget, id=budget_id)
concepts = budget.concept_budget.all()
try:
file = app_pdf.PDF(None, budget, concepts)
pdf_file = file.generate_budget()
file_name = 'Pto_' + str(budget.brand.client.name) + '_' + str(budget.name) + '_' + str(budget.code) + ".pdf"
budget._generated_pdf = True
budget.pdf.save(file_name, ContentFile(pdf_file), save=True)
except Exception as e:
print("Error generate_budget_pdf_task")
print(e)
print(type(e))