Django Celery Task: AttributeError: type object 'Command' has no attribute '__annotations__'
Django Celery Task: AttributeError: type object 'Command' has no attribute '__annotations__'
我正在尝试设置一个 celery 任务,它基本上是 运行 一个脚本,用于停用 json 文件中的一系列设施。我曾经手动 运行 它并且它 运行 很好但是现在当 Celery 尝试 运行 它作为一项任务时我收到以下错误:
AttributeError: type object 'Command' has no attribute '__annotations__'
谁能告诉我为什么它不起作用?
这是我正在使用的代码:
tasks.py
import requests
import json
from project.users.models import Facility
from django.core.management.base import BaseCommand
from config import celery_app
IMPORT_URL = 'https://example.com/file.json'
@celery_app.task()
class Command(BaseCommand):
def delete_facility(self, data):
data = data
if Facility.objects.filter(UUID=data, ReportedStatusField='closed'):
print("Facility status already set to closed")
else:
facility, facility_closed = Facility.objects.update_or_create(UUID=data,
defaults={
'ReportedStatusField': 'closed'
}
)
print("Facility status set to closed")
def handle(self, *args, **options):
headers = {'Content-Type': 'application/json'}
response = requests.get(
url=IMPORT_URL,
headers=headers,
)
response.raise_for_status()
data = response.json()
for data, data_object in data.items():
if data in ["deleted"]:
for data in data_object:
self.delete_facility(data)
我想我明白了。现在是运行。我从 tasks.py 调用基本命令文件,而不是将代码直接添加到 tasks.py:
from celery import shared_task
from django.core.management import call_command
@shared_task
def start_import():
call_command("import_from_url", )
我正在尝试设置一个 celery 任务,它基本上是 运行 一个脚本,用于停用 json 文件中的一系列设施。我曾经手动 运行 它并且它 运行 很好但是现在当 Celery 尝试 运行 它作为一项任务时我收到以下错误:
AttributeError: type object 'Command' has no attribute '__annotations__'
谁能告诉我为什么它不起作用? 这是我正在使用的代码:
tasks.py
import requests
import json
from project.users.models import Facility
from django.core.management.base import BaseCommand
from config import celery_app
IMPORT_URL = 'https://example.com/file.json'
@celery_app.task()
class Command(BaseCommand):
def delete_facility(self, data):
data = data
if Facility.objects.filter(UUID=data, ReportedStatusField='closed'):
print("Facility status already set to closed")
else:
facility, facility_closed = Facility.objects.update_or_create(UUID=data,
defaults={
'ReportedStatusField': 'closed'
}
)
print("Facility status set to closed")
def handle(self, *args, **options):
headers = {'Content-Type': 'application/json'}
response = requests.get(
url=IMPORT_URL,
headers=headers,
)
response.raise_for_status()
data = response.json()
for data, data_object in data.items():
if data in ["deleted"]:
for data in data_object:
self.delete_facility(data)
我想我明白了。现在是运行。我从 tasks.py 调用基本命令文件,而不是将代码直接添加到 tasks.py:
from celery import shared_task
from django.core.management import call_command
@shared_task
def start_import():
call_command("import_from_url", )