Celery/redis 任务并不总是完成 - 不确定为什么或如何适应它
Celery/redis tasks don't always complete - not sure why or how to fit it
我在 django v 3.0.1 应用程序中 运行ning celery v 4.0.3/redis v 4.09 (Python v 3.6.9)。我还在 celery 任务 find_faces
中使用 face_recognition 来查找我上传到应用程序的图像中的人脸,以及其他图像处理 celery 任务。处理五个或更少的图像文件没有问题,因为所有图像处理 celery 任务都成功完成。
当我让图像处理任务(包括 find_faces
)迭代 100 张图像时,有 10-30 张图像 find_faces
任务未完成。当我使用 flower v0.9.7 查看 celery 任务时,我看到 find_faces
任务状态对于那些未完成的图像是“已启动”。所有其他图像的 find_faces
任务状态为“成功”。这些“已启动”任务的状态永远不会改变,也不会报告任何错误或异常。然后我可以 运行 图像处理任务,包括 find_faces
任务,分别对这些图像中的每一个进行处理,任务状态为“成功”。如果我 运行 celery 作为守护进程或本地,或者如果我 运行 使用 wsgi 和 apache 或 运行 服务器的 django 应用程序,这些结果不会改变。 Flower 还报告说我的所有任务的重试次数 = 0。
我在 Django 应用程序中全局设置了 CELERYD_TASK_SOFT_TIME_LIMIT = 60
,max_retries=5
用于 find_faces
任务。
@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
logger.debug("find_faces_task START")
try:
temp_face = None
from memorabilia.models import TaskStatus, Document
args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
ts.save()
import time
time_start = time.time()
# Check if we already have the faces for this document
from biometric_identification.models import Face
if len(Face.objects.filter(document_id=document_id)) != 0:
# This document has already been scanned, so need to remove it and rescan
# Have to manually delete each object per django docs to insure the
# model delete method is run to update the metadata.
logger.debug("Document %s has already been scanned" % document_id)
faces = Face.objects.filter(document_id=document_id)
for face in faces:
face.delete()
logger.debug("Deleted face=%s" % face.tag_value.value)
document = Document.objects.get(document_id=document_id)
image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
image_path = image_file.path
time_start_looking = time.time()
temp_file = open(image_path, 'rb')
temp_image = Image.open(temp_file)
logger.debug("fred.mode=%s" % fred.mode)
width, height = temp_image.size
image = face_recognition.load_image_file(temp_file)
# Get the coordinates of each face
if use_cuda:
# With CUDA installed
logger.debug("Using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0)
else:
# without CUDA installed
logger.debug("NOT using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
time_find_faces = time.time()
# Get the face encodings for each face in the picture
face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations)
logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
time_face_encodings = time.time()
# Save the faces found in the database
for location, encoding in zip(face_locations, face_encodings):
# Create the new Face object and load in the document, encoding, and location of a face found
# Locations seem to be of the form (y,x)
from memorabilia.models import MetaData, MetaDataValue
tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size={'width': width, "height":height}, tag_type=tag_type_people, tag_value=tag_value_unknown)
# save the newly found Face object
new_face.save()
logger.debug("Saved new_face %s" % new_face.face_file)
time_end = time.time()
logger.debug("total time = {}".format(time_end - time_start))
logger.debug("time to find faces = {}".format(time_find_faces - time_start_looking))
logger.debug("time to find encodings = {}".format(time_face_encodings - time_find_faces))
ts.task_status = TaskStatus.SUCCESS
ts.comment = "Found %s faces" % len(face_encodings)
return document_id
except Exception as e:
logger.exception("Hit an exception in find_faces_task %s" % str(e))
ts.task_status = TaskStatus.ERROR
ts.comment = "An exception while finding faces: %s" % repr(e)
finally:
logger.debug("Finally clause in find-faces_task")
if temp_image:
temp_image.close()
if temp_file:
temp_file.close()
ts.save(update_fields=['task_status', 'comment'])
logger.debug("find_faces_task END")
find_faces
任务被称为处理图像的更大任务链的一部分。每个图像文件都经过此链,其中 step_1 和 step_2 是不同图像处理步骤的和弦:
step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() ) # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())
@app.task
def chordfinisher( *args, **kwargs ):
return "OK"
图像很大,因此 find_faces
任务最多可能需要 30 秒才能完成。我认为 CELERYD_TASK_SOFT_TIME_LIMIT = 60
会处理这么长的处理时间。
我绝不是 celery 专家,所以我假设我需要启用 celery 设置或选项以确保 find_faces
任务始终完成。我只是不知道那会是什么。
经过更多研究,我可以接受 Lewis Carroll 的建议,在 post "Beware the oom-killer, my son! The jaws that bite, the claws that catch!", and this post Chaining Chords produces enormously big messages causing OOM on workers, and this post WorkerLostError: Worker exited prematurely: exitcode 155.
我的 celery worker 似乎 运行 内存不足,因为我确实在我的系统日志中发现了可怕的 oomkiller 的踪迹。我将我的任务重新配置为链式(删除了所有组和和弦),因此每个任务对于每个图像都是 运行 单独按顺序排列的,并且无论我处理了多少图像,任务都成功完成。
我在 django v 3.0.1 应用程序中 运行ning celery v 4.0.3/redis v 4.09 (Python v 3.6.9)。我还在 celery 任务 find_faces
中使用 face_recognition 来查找我上传到应用程序的图像中的人脸,以及其他图像处理 celery 任务。处理五个或更少的图像文件没有问题,因为所有图像处理 celery 任务都成功完成。
当我让图像处理任务(包括 find_faces
)迭代 100 张图像时,有 10-30 张图像 find_faces
任务未完成。当我使用 flower v0.9.7 查看 celery 任务时,我看到 find_faces
任务状态对于那些未完成的图像是“已启动”。所有其他图像的 find_faces
任务状态为“成功”。这些“已启动”任务的状态永远不会改变,也不会报告任何错误或异常。然后我可以 运行 图像处理任务,包括 find_faces
任务,分别对这些图像中的每一个进行处理,任务状态为“成功”。如果我 运行 celery 作为守护进程或本地,或者如果我 运行 使用 wsgi 和 apache 或 运行 服务器的 django 应用程序,这些结果不会改变。 Flower 还报告说我的所有任务的重试次数 = 0。
我在 Django 应用程序中全局设置了 CELERYD_TASK_SOFT_TIME_LIMIT = 60
,max_retries=5
用于 find_faces
任务。
@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
logger.debug("find_faces_task START")
try:
temp_face = None
from memorabilia.models import TaskStatus, Document
args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
ts.save()
import time
time_start = time.time()
# Check if we already have the faces for this document
from biometric_identification.models import Face
if len(Face.objects.filter(document_id=document_id)) != 0:
# This document has already been scanned, so need to remove it and rescan
# Have to manually delete each object per django docs to insure the
# model delete method is run to update the metadata.
logger.debug("Document %s has already been scanned" % document_id)
faces = Face.objects.filter(document_id=document_id)
for face in faces:
face.delete()
logger.debug("Deleted face=%s" % face.tag_value.value)
document = Document.objects.get(document_id=document_id)
image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
image_path = image_file.path
time_start_looking = time.time()
temp_file = open(image_path, 'rb')
temp_image = Image.open(temp_file)
logger.debug("fred.mode=%s" % fred.mode)
width, height = temp_image.size
image = face_recognition.load_image_file(temp_file)
# Get the coordinates of each face
if use_cuda:
# With CUDA installed
logger.debug("Using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0)
else:
# without CUDA installed
logger.debug("NOT using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
time_find_faces = time.time()
# Get the face encodings for each face in the picture
face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations)
logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
time_face_encodings = time.time()
# Save the faces found in the database
for location, encoding in zip(face_locations, face_encodings):
# Create the new Face object and load in the document, encoding, and location of a face found
# Locations seem to be of the form (y,x)
from memorabilia.models import MetaData, MetaDataValue
tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size={'width': width, "height":height}, tag_type=tag_type_people, tag_value=tag_value_unknown)
# save the newly found Face object
new_face.save()
logger.debug("Saved new_face %s" % new_face.face_file)
time_end = time.time()
logger.debug("total time = {}".format(time_end - time_start))
logger.debug("time to find faces = {}".format(time_find_faces - time_start_looking))
logger.debug("time to find encodings = {}".format(time_face_encodings - time_find_faces))
ts.task_status = TaskStatus.SUCCESS
ts.comment = "Found %s faces" % len(face_encodings)
return document_id
except Exception as e:
logger.exception("Hit an exception in find_faces_task %s" % str(e))
ts.task_status = TaskStatus.ERROR
ts.comment = "An exception while finding faces: %s" % repr(e)
finally:
logger.debug("Finally clause in find-faces_task")
if temp_image:
temp_image.close()
if temp_file:
temp_file.close()
ts.save(update_fields=['task_status', 'comment'])
logger.debug("find_faces_task END")
find_faces
任务被称为处理图像的更大任务链的一部分。每个图像文件都经过此链,其中 step_1 和 step_2 是不同图像处理步骤的和弦:
step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() ) # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())
@app.task
def chordfinisher( *args, **kwargs ):
return "OK"
图像很大,因此 find_faces
任务最多可能需要 30 秒才能完成。我认为 CELERYD_TASK_SOFT_TIME_LIMIT = 60
会处理这么长的处理时间。
我绝不是 celery 专家,所以我假设我需要启用 celery 设置或选项以确保 find_faces
任务始终完成。我只是不知道那会是什么。
经过更多研究,我可以接受 Lewis Carroll 的建议,在 post "Beware the oom-killer, my son! The jaws that bite, the claws that catch!", and this post Chaining Chords produces enormously big messages causing OOM on workers, and this post WorkerLostError: Worker exited prematurely: exitcode 155.
我的 celery worker 似乎 运行 内存不足,因为我确实在我的系统日志中发现了可怕的 oomkiller 的踪迹。我将我的任务重新配置为链式(删除了所有组和和弦),因此每个任务对于每个图像都是 运行 单独按顺序排列的,并且无论我处理了多少图像,任务都成功完成。