202 Accepted 响应的正确设计模式
Proper design pattern for 202 Accepted responses
我有一个 celery shared_task
设置,如果第一次没有成功,最多重试 10 次。初始日志语句只执行一次。 None 的异常被引发,嵌入的 try/else
也没有。语句 result = LdapHostGroupView().start(data, username, version)
确实执行了,它从日志条目中显示它成功了,但最后的 else
永远不会执行。
这是怎么回事?
@shared_task(bind=True, default_retry_delay=15, max_retry=10)
def host_accepted(self, data, username, version):
from .api.views import LdapHostGroupView
name = data.get('name', '')
version = Decimal(version)
log.debug("name: %s, version: %s, version type: %s, data: %s",
name, version, type(version), data)
try:
obj = Transaction.objects.get(endpoint_name=name)
except Transaction.DoesNotExist as e:
msg = "Could not find transaction '{}'".format(name)
log.critical(msg)
syslog.critical(msg)
else:
try:
result = LdapHostGroupView().start(data, username, version)
except RealmBundleDoesNotExist as e:
log.debug("Bundle does not exist yet.")
obj.job_summary += str(e) + '\n'
obj.job_status = Transaction.INPROGRESS
obj.save()
self.retry(exc=e) # ** self.request.retries)
except (RealmCriticalException, ValidationError) as e:
error = e.get_full_details()
log.debug("Host Accepted error: %s", error)
if isinstance(error, dict):
for field, values in error.items():
for value in values:
ed = value.get('message')
if isinstance(ed, ErrorDetail):
item = str(ed)
else:
item = value
msg = "Field '{}' has error: {}\n".format(field, item)
obj.job_summary += msg
else:
obj.job_summary += "Had error with no message.\n"
obj.job_status = Transaction.FAILURE
obj.save()
else:
log.info("Celery task 'host_accepted' executed at %s, "
"returned %s, incoming data %s",
datetime.now(tzutc()).isoformat(), result, data)
# Check the result object.
obj.job_status = Transaction.SUCCESS
obj.save()
它在 Django 视图中是这样调用的:
host_accepted.delay(request.data, request.user.username, request.version)
所以在发布我的问题后,我突然想到我上面的代码依赖于我能够重新创建 request
对象或 pickle 对象。这些方法都不可能。所以我需要做的是只将需要一段时间 运行 的代码包装在 celery 任务中。我发现我可以 return 序列化程序的 create
方法中的 celery 任务的结果,而不是普通的 DB 对象。
我应该提一下,这个序列化器在任何情况下都不会 returns 数据库对象,因为它实际上将来自两个外部 APIs 的数据聚合到我的 API 中。我没有显示该代码。
我的视图经过大量自定义,但是,它们的功能与普通视图差不多。
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__accepted = False
def post(self, request, *args, **kwargs):
self.__accepted = False # Use the normal serializer
self.create(request, *args, **kwargs)
self.__accepted = True # Use the JobQueue serializer
return self.create_accepted(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
# Do not return a Response
def create_accepted(self, request, *args, **kwargs):
data = {}
data['endpoint_name'] = request.data.get('name')
# Add any data needed to create a JobQueue object.
serializer = self.get_serializer(data=data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
data = serializer.data
headers = self.get_success_headers(data)
return Response(data, status=status.HTTP_202_ACCEPTED,
headers=headers)
def get_serializer_class(self):
serializer = None
if self.__accepted:
if self.request.version == Decimal("1"):
serializer = JobQueueSerializerVer01
else:
if self.request.version == Decimal("1"):
serializer = SomeSerializerVer01
return serializer
现在序列化程序发生了变化:
class SomeSerializerVer01(serializers.Serializer):
def create(self, validated_data):
# Call the task
return wait_for_long_running_code.delay(
validated_data, self.initial_data)
def create_after_task(self, validated_data, initial_data):
self.initial_data = initial_data
self._create_or_update_job_queue(
name, job_status=JobQueue.INPROGRESS)
# Do what you need to do here
data = {}
# Update the JobQueue DB object.
self._create_or_update_job_queue(
name, job_status=JobQueue.SUCCESS,
job_ended=datetime.datetime.now(tzutc()))
return data
def _create_or_update_job_queue(self, name, **kwargs):
trx = JobQueue.objects.create_transaction(
name, Endpoint.HOST_GROUP, self.get_user_object(), **kwargs)
return trx
现在任务:
@shared_task(bind=True, default_retry_delay=15, max_retry=8)
def wait_for_long_runninf_code(self, validated_data, initial_data):
from your.path import SomeSerializerVer01
ser = SomeSerializerVer01()
result = {}
try:
result = ser.create_after_task(validated_data, initial_data)
except Exception:
self.retry(exc=e)
return result
就是这样。我做的一些事情你可能不需要做,比如传递 initial_data
指令。上面并未显示所有内容,例如,JobQueue
数据库对象的视图和序列化程序。
我有一个 celery shared_task
设置,如果第一次没有成功,最多重试 10 次。初始日志语句只执行一次。 None 的异常被引发,嵌入的 try/else
也没有。语句 result = LdapHostGroupView().start(data, username, version)
确实执行了,它从日志条目中显示它成功了,但最后的 else
永远不会执行。
这是怎么回事?
@shared_task(bind=True, default_retry_delay=15, max_retry=10)
def host_accepted(self, data, username, version):
from .api.views import LdapHostGroupView
name = data.get('name', '')
version = Decimal(version)
log.debug("name: %s, version: %s, version type: %s, data: %s",
name, version, type(version), data)
try:
obj = Transaction.objects.get(endpoint_name=name)
except Transaction.DoesNotExist as e:
msg = "Could not find transaction '{}'".format(name)
log.critical(msg)
syslog.critical(msg)
else:
try:
result = LdapHostGroupView().start(data, username, version)
except RealmBundleDoesNotExist as e:
log.debug("Bundle does not exist yet.")
obj.job_summary += str(e) + '\n'
obj.job_status = Transaction.INPROGRESS
obj.save()
self.retry(exc=e) # ** self.request.retries)
except (RealmCriticalException, ValidationError) as e:
error = e.get_full_details()
log.debug("Host Accepted error: %s", error)
if isinstance(error, dict):
for field, values in error.items():
for value in values:
ed = value.get('message')
if isinstance(ed, ErrorDetail):
item = str(ed)
else:
item = value
msg = "Field '{}' has error: {}\n".format(field, item)
obj.job_summary += msg
else:
obj.job_summary += "Had error with no message.\n"
obj.job_status = Transaction.FAILURE
obj.save()
else:
log.info("Celery task 'host_accepted' executed at %s, "
"returned %s, incoming data %s",
datetime.now(tzutc()).isoformat(), result, data)
# Check the result object.
obj.job_status = Transaction.SUCCESS
obj.save()
它在 Django 视图中是这样调用的:
host_accepted.delay(request.data, request.user.username, request.version)
所以在发布我的问题后,我突然想到我上面的代码依赖于我能够重新创建 request
对象或 pickle 对象。这些方法都不可能。所以我需要做的是只将需要一段时间 运行 的代码包装在 celery 任务中。我发现我可以 return 序列化程序的 create
方法中的 celery 任务的结果,而不是普通的 DB 对象。
我应该提一下,这个序列化器在任何情况下都不会 returns 数据库对象,因为它实际上将来自两个外部 APIs 的数据聚合到我的 API 中。我没有显示该代码。
我的视图经过大量自定义,但是,它们的功能与普通视图差不多。
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__accepted = False
def post(self, request, *args, **kwargs):
self.__accepted = False # Use the normal serializer
self.create(request, *args, **kwargs)
self.__accepted = True # Use the JobQueue serializer
return self.create_accepted(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
# Do not return a Response
def create_accepted(self, request, *args, **kwargs):
data = {}
data['endpoint_name'] = request.data.get('name')
# Add any data needed to create a JobQueue object.
serializer = self.get_serializer(data=data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
data = serializer.data
headers = self.get_success_headers(data)
return Response(data, status=status.HTTP_202_ACCEPTED,
headers=headers)
def get_serializer_class(self):
serializer = None
if self.__accepted:
if self.request.version == Decimal("1"):
serializer = JobQueueSerializerVer01
else:
if self.request.version == Decimal("1"):
serializer = SomeSerializerVer01
return serializer
现在序列化程序发生了变化:
class SomeSerializerVer01(serializers.Serializer):
def create(self, validated_data):
# Call the task
return wait_for_long_running_code.delay(
validated_data, self.initial_data)
def create_after_task(self, validated_data, initial_data):
self.initial_data = initial_data
self._create_or_update_job_queue(
name, job_status=JobQueue.INPROGRESS)
# Do what you need to do here
data = {}
# Update the JobQueue DB object.
self._create_or_update_job_queue(
name, job_status=JobQueue.SUCCESS,
job_ended=datetime.datetime.now(tzutc()))
return data
def _create_or_update_job_queue(self, name, **kwargs):
trx = JobQueue.objects.create_transaction(
name, Endpoint.HOST_GROUP, self.get_user_object(), **kwargs)
return trx
现在任务:
@shared_task(bind=True, default_retry_delay=15, max_retry=8)
def wait_for_long_runninf_code(self, validated_data, initial_data):
from your.path import SomeSerializerVer01
ser = SomeSerializerVer01()
result = {}
try:
result = ser.create_after_task(validated_data, initial_data)
except Exception:
self.retry(exc=e)
return result
就是这样。我做的一些事情你可能不需要做,比如传递 initial_data
指令。上面并未显示所有内容,例如,JobQueue
数据库对象的视图和序列化程序。