202 Accepted 响应的正确设计模式

Proper design pattern for 202 Accepted responses

我有一个 celery shared_task 设置,如果第一次没有成功,最多重试 10 次。初始日志语句只执行一次。 None 的异常被引发,嵌入的 try/else 也没有。语句 result = LdapHostGroupView().start(data, username, version) 确实执行了,它从日志条目中显示它成功了,但最后的 else 永远不会执行。

这是怎么回事?

@shared_task(bind=True, default_retry_delay=15, max_retry=10)
def host_accepted(self, data, username, version):
    from .api.views import LdapHostGroupView
    name = data.get('name', '')
    version = Decimal(version)
    log.debug("name: %s, version: %s, version type: %s, data: %s",
              name, version, type(version), data)

    try:
        obj = Transaction.objects.get(endpoint_name=name)
    except Transaction.DoesNotExist as e:
        msg = "Could not find transaction '{}'".format(name)
        log.critical(msg)
        syslog.critical(msg)
    else:
        try:
            result = LdapHostGroupView().start(data, username, version)
        except RealmBundleDoesNotExist as e:
            log.debug("Bundle does not exist yet.")
            obj.job_summary += str(e) + '\n'
            obj.job_status = Transaction.INPROGRESS
            obj.save()
            self.retry(exc=e) # ** self.request.retries)
        except (RealmCriticalException, ValidationError) as e:
            error = e.get_full_details()
            log.debug("Host Accepted error: %s", error)

            if isinstance(error, dict):
                for field, values in error.items():
                    for value in values:
                        ed = value.get('message')

                        if isinstance(ed, ErrorDetail):
                            item = str(ed)
                        else:
                            item = value

                        msg = "Field '{}' has error: {}\n".format(field, item)
                        obj.job_summary += msg
            else:
                obj.job_summary += "Had error with no message.\n"

            obj.job_status = Transaction.FAILURE
            obj.save()
        else:
            log.info("Celery task 'host_accepted' executed at %s, "
                     "returned %s, incoming data %s",
                     datetime.now(tzutc()).isoformat(), result, data)

            # Check the result object.

            obj.job_status = Transaction.SUCCESS
            obj.save()

它在 Django 视图中是这样调用的:

   host_accepted.delay(request.data, request.user.username, request.version)

所以在发布我的问题后,我突然想到我上面的代码依赖于我能够重新创建 request 对象或 pickle 对象。这些方法都不可能。所以我需要做的是只将需要一段时间 运行 的代码包装在 celery 任务中。我发现我可以 return 序列化程序的 create 方法中的 celery 任务的结果,而不是普通的 DB 对象。

我应该提一下,这个序列化器在任何情况下都不会 returns 数据库对象,因为它实际上将来自两个外部 APIs 的数据聚合到我的 API 中。我没有显示该代码。

我的视图经过大量自定义,但是,它们的功能与普通视图差不多。

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.__accepted = False

def post(self, request, *args, **kwargs):
    self.__accepted = False # Use the normal serializer
    self.create(request, *args, **kwargs)
    self.__accepted = True # Use the JobQueue serializer
    return self.create_accepted(request, *args, **kwargs)

def create(self, request, *args, **kwargs):
    serializer = self.get_serializer(data=request.data)
    serializer.is_valid(raise_exception=True)
    self.perform_create(serializer)
    # Do not return a Response

def create_accepted(self, request, *args, **kwargs):
    data = {}
    data['endpoint_name'] = request.data.get('name')
    # Add any data needed to create a JobQueue object.
    serializer = self.get_serializer(data=data)
    serializer.is_valid(raise_exception=True)
    self.perform_create(serializer)
    data = serializer.data
    headers = self.get_success_headers(data)
    return Response(data, status=status.HTTP_202_ACCEPTED,
                    headers=headers)

def get_serializer_class(self):
    serializer = None

    if self.__accepted:
        if self.request.version == Decimal("1"):
            serializer = JobQueueSerializerVer01
    else:
        if self.request.version == Decimal("1"):
            serializer = SomeSerializerVer01

    return serializer

现在序列化程序发生了变化:

class SomeSerializerVer01(serializers.Serializer):

    def create(self, validated_data):
        # Call the task
        return wait_for_long_running_code.delay(
            validated_data, self.initial_data)

    def create_after_task(self, validated_data, initial_data):
        self.initial_data = initial_data
        self._create_or_update_job_queue(
            name, job_status=JobQueue.INPROGRESS)
        # Do what you need to do here
        data = {}

        # Update the JobQueue DB object.
        self._create_or_update_job_queue(
            name, job_status=JobQueue.SUCCESS,
            job_ended=datetime.datetime.now(tzutc()))

        return data

    def _create_or_update_job_queue(self, name, **kwargs):
        trx = JobQueue.objects.create_transaction(
            name, Endpoint.HOST_GROUP, self.get_user_object(), **kwargs)
        return trx

现在任务:

@shared_task(bind=True, default_retry_delay=15, max_retry=8)
def wait_for_long_runninf_code(self, validated_data, initial_data):
    from your.path import SomeSerializerVer01
    ser = SomeSerializerVer01()
    result = {}

    try:
       result = ser.create_after_task(validated_data, initial_data)
    except Exception:
       self.retry(exc=e)

    return result

就是这样。我做的一些事情你可能不需要做,比如传递 initial_data 指令。上面并未显示所有内容,例如,JobQueue 数据库对象的视图和序列化程序。