在另一台服务器中获取芹菜任务的状态
getting status of celery tasks in another server
我有两台服务器运行芹菜。
1 - 网络服务器
2 - 工人服务器
Web 服务器负责将任务发送到工作服务器。工人执行这些任务。
我想在网络服务器上监控这些任务的状态。有什么想法吗?
views.py
@user_passes_test(lambda u: u.is_superuser)
def taskCompute(request, uuid):
"""
Now we have the uuid that identifies the image over we want to apply the process as an input.
To trigger this view, the url has an uuid, and we receive it to receive the image we are working with
"""
doc = AnnotationDocument.objects.get(uuid=uuid)
image_name = doc.name_id
doc.status = 'Q'
doc.save()
image_processing.delay(uuid, image_name)
return HttpResponseRedirect(reverse('list_admin'))
tasks.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
import subprocess as sub
from PIL import Image
app = Celery('tasks',
broker='amqp://xxxxx',
backend='rpc://', )
NFS_PATH = '/home/administrator/nfs'
@app.task
def image_processing(uuid, image_name):
"""
WORKERS job: - Get the images from the NFS pre-processing path
- Move them to the worker path
- Process the images
- Create the PDF
- Leave the Results in the NFS post-processing path
"""
current_working_path = os.path.join(NFS_PATH, uuid)
local_filename_image = os.path.join(current_working_path, image_name)
local_filename_annotations = os.path.join(current_working_path, "annotations.json")
if os.path.isfile(local_filename_annotations):
local_filename_annotations = local_filename_annotations
else:
local_filename_annotations = "None"
cmd = '/home/administrator/Envs/CESSOR-env/bin/python' \
+ ' /home/administrator/CESSOR/compute.py' \
+ ' --iImage=' + local_filename_image \
print "Command:", cmd
p = sub.Popen(cmd.split(), stdout=sub.PIPE, stderr=sub.PIPE)
output, errors = p.communicate()
print output
# Get thumbnail of the uploaded image to render it in the detail.html template:
image_path = os.path.join(current_working_path, 'process_result.jpg')
img = Image.open(image_path)
w, h = img.size
a = 301.0
b = 200.0
ptgx = a / w
ptgy = b / h
w2 = w * ptgx
h2 = h * ptgy
w2 = int(w2)
h2 = int(h2)
img_thumbnail = img.resize((w2, h2), Image.ANTIALIAS)
# Save the thumbnail:
input_thumbnail_filename = 'process_result_thnl.png'
img_thumbnail.save(os.path.join(current_working_path, input_thumbnail_filename))
return uuid
您需要配置 Celery 以将任务结果存储在 Django 数据库中。为此,请使用 django-celery-results 库。请参阅 configuring the Django backend.
上的 Celery 文档
完成此操作后,您可以编写一个视图来查询 TaskResult 模型,就像查询任何其他 Django 数据一样。
我有两台服务器运行芹菜。
1 - 网络服务器 2 - 工人服务器
Web 服务器负责将任务发送到工作服务器。工人执行这些任务。
我想在网络服务器上监控这些任务的状态。有什么想法吗?
views.py
@user_passes_test(lambda u: u.is_superuser)
def taskCompute(request, uuid):
"""
Now we have the uuid that identifies the image over we want to apply the process as an input.
To trigger this view, the url has an uuid, and we receive it to receive the image we are working with
"""
doc = AnnotationDocument.objects.get(uuid=uuid)
image_name = doc.name_id
doc.status = 'Q'
doc.save()
image_processing.delay(uuid, image_name)
return HttpResponseRedirect(reverse('list_admin'))
tasks.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
import subprocess as sub
from PIL import Image
app = Celery('tasks',
broker='amqp://xxxxx',
backend='rpc://', )
NFS_PATH = '/home/administrator/nfs'
@app.task
def image_processing(uuid, image_name):
"""
WORKERS job: - Get the images from the NFS pre-processing path
- Move them to the worker path
- Process the images
- Create the PDF
- Leave the Results in the NFS post-processing path
"""
current_working_path = os.path.join(NFS_PATH, uuid)
local_filename_image = os.path.join(current_working_path, image_name)
local_filename_annotations = os.path.join(current_working_path, "annotations.json")
if os.path.isfile(local_filename_annotations):
local_filename_annotations = local_filename_annotations
else:
local_filename_annotations = "None"
cmd = '/home/administrator/Envs/CESSOR-env/bin/python' \
+ ' /home/administrator/CESSOR/compute.py' \
+ ' --iImage=' + local_filename_image \
print "Command:", cmd
p = sub.Popen(cmd.split(), stdout=sub.PIPE, stderr=sub.PIPE)
output, errors = p.communicate()
print output
# Get thumbnail of the uploaded image to render it in the detail.html template:
image_path = os.path.join(current_working_path, 'process_result.jpg')
img = Image.open(image_path)
w, h = img.size
a = 301.0
b = 200.0
ptgx = a / w
ptgy = b / h
w2 = w * ptgx
h2 = h * ptgy
w2 = int(w2)
h2 = int(h2)
img_thumbnail = img.resize((w2, h2), Image.ANTIALIAS)
# Save the thumbnail:
input_thumbnail_filename = 'process_result_thnl.png'
img_thumbnail.save(os.path.join(current_working_path, input_thumbnail_filename))
return uuid
您需要配置 Celery 以将任务结果存储在 Django 数据库中。为此,请使用 django-celery-results 库。请参阅 configuring the Django backend.
上的 Celery 文档完成此操作后,您可以编写一个视图来查询 TaskResult 模型,就像查询任何其他 Django 数据一样。