Celery/redis 任务并不总是完成 - 不确定为什么或如何适应它

Celery/redis tasks don't always complete - not sure why or how to fit it

我在 django v 3.0.1 应用程序中 运行ning celery v 4.0.3/redis v 4.09 (Python v 3.6.9)。我还在 celery 任务 find_faces 中使用 face_recognition 来查找我上传到应用程序的图像中的人脸,以及其他图像处理 celery 任务。处理五个或更少的图像文件没有问题,因为所有图像处理 celery 任务都成功完成。

当我让图像处理任务(包括 find_faces)迭代 100 张图像时,有 10-30 张图像 find_faces 任务未完成。当我使用 flower v0.9.7 查看 celery 任务时,我看到 find_faces 任务状态对于那些未完成的图像是“已启动”。所有其他图像的 find_faces 任务状态为“成功”。这些“已启动”任务的状态永远不会改变,也不会报告任何错误或异常。然后我可以 运行 图像处理任务,包括 find_faces 任务,分别对这些图像中的每一个进行处理,任务状态为“成功”。如果我 运行 celery 作为守护进程或本地,或者如果我 运行 使用 wsgi 和 apache 或 运行 服务器的 django 应用程序,这些结果不会改变。 Flower 还报告说我的所有任务的重试次数 = 0。

我在 Django 应用程序中全局设置了 CELERYD_TASK_SOFT_TIME_LIMIT = 60max_retries=5 用于 find_faces 任务。

@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
    logger.debug("find_faces_task START")
    try:
        temp_face = None
        from memorabilia.models import TaskStatus, Document      
        args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
        ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
        ts.save()
        import time
        time_start = time.time()
        # Check if we already have the faces for this document
        from biometric_identification.models import Face
        if len(Face.objects.filter(document_id=document_id)) != 0:
            # This document has already been scanned, so need to remove it and rescan
            # Have to manually delete each object per django docs to insure the 
            # model delete method is run to update the metadata.
            logger.debug("Document %s has already been scanned" % document_id)
            faces = Face.objects.filter(document_id=document_id)
            for face in faces:
                face.delete()
                logger.debug("Deleted face=%s" % face.tag_value.value)
        document = Document.objects.get(document_id=document_id)
        image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
        image_path = image_file.path
        time_start_looking = time.time()
        temp_file = open(image_path, 'rb')
        temp_image = Image.open(temp_file)
        logger.debug("fred.mode=%s" % fred.mode)
        width, height = temp_image.size
        image = face_recognition.load_image_file(temp_file)
        # Get the coordinates of each face
        if use_cuda:
            # With CUDA installed
            logger.debug("Using CUDA for face recognition")
            face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0) 
        else:
            # without CUDA installed
            logger.debug("NOT using CUDA for face recognition")
            face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
        time_find_faces = time.time()
        # Get the face encodings for each face in the picture    
        face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations) 
        logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
        time_face_encodings = time.time()
        # Save the faces found in the database
        for location, encoding in zip(face_locations, face_encodings):
            # Create the new Face object and load in the document, encoding, and location of a face found
            # Locations seem to be of the form (y,x)
            from memorabilia.models import MetaData, MetaDataValue
            tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
            tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
            new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size={'width': width, "height":height}, tag_type=tag_type_people, tag_value=tag_value_unknown)         
            # save the newly found Face object
            new_face.save()
            logger.debug("Saved new_face %s" % new_face.face_file) 
        time_end = time.time()
        logger.debug("total time = {}".format(time_end - time_start))
        logger.debug("time to find faces = {}".format(time_find_faces - time_start_looking))
        logger.debug("time to find encodings = {}".format(time_face_encodings - time_find_faces))
        ts.task_status = TaskStatus.SUCCESS
        ts.comment = "Found %s faces" % len(face_encodings)
        return document_id
    except Exception as e:
        logger.exception("Hit an exception in find_faces_task %s" % str(e))
        ts.task_status = TaskStatus.ERROR
        ts.comment = "An exception while finding faces: %s" % repr(e)
    finally:
        logger.debug("Finally clause in find-faces_task")
        if temp_image:
            temp_image.close()
        if temp_file:
            temp_file.close()
        ts.save(update_fields=['task_status', 'comment'])
        logger.debug("find_faces_task END")

find_faces 任务被称为处理图像的更大任务链的一部分。每个图像文件都经过此链,其中 step_1 和 step_2 是不同图像处理步骤的和弦:

step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() )  # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())

@app.task
def chordfinisher( *args, **kwargs ):
    return "OK"

图像很大,因此 find_faces 任务最多可能需要 30 秒才能完成。我认为 CELERYD_TASK_SOFT_TIME_LIMIT = 60 会处理这么长的处理时间。

我绝不是 celery 专家,所以我假设我需要启用 celery 设置或选项以确保 find_faces 任务始终完成。我只是不知道那会是什么。

经过更多研究,我可以接受 Lewis Carroll 的建议,在 post "Beware the oom-killer, my son! The jaws that bite, the claws that catch!", and this post Chaining Chords produces enormously big messages causing OOM on workers, and this post WorkerLostError: Worker exited prematurely: exitcode 155.

我的 celery worker 似乎 运行 内存不足,因为我确实在我的系统日志中发现了可怕的 oomkiller 的踪迹。我将我的任务重新配置为链式(删除了所有组和和弦),因此每个任务对于每个图像都是 运行 单独按顺序排列的,并且无论我处理了多少图像,任务都成功完成。