芹菜工人只工作一次
celery worker only work once
完整步骤:
启动django
启动一个 celery worker
python manage.py 芹菜工人 --app=celery_worker:app -Ofair -n W1
上传一个url列表文件,循环url列表发送每个url到一个任务fetch_article
工人工作
上传另一个url列表文件
工人无操作
views.py:
@csrf_exempt
def upload(request):
job_name = request.POST.get('job_name')
if not job_name:
return JsonResponse(JsonStatus.Error)
if len(request.FILES) == 1:
yq_data = request.FILES.values()[0]
else:
return JsonResponse(JsonStatus.Error)
job = Job.objects.create(name=job_name)
reader = csv.reader(yq_data, delimiter=',')
task_count = 0
next(reader)
for row in reader:
url = row[0].strip()
fetch_article.delay(job.id, url)
# fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
task_count += 1
# print 'qn%s' % job.queue_name
# rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
# print rp
job.task_count = task_count
job.save()
return JsonResponse(JsonStatus.OK, msg=task_count)
tasks.py
@shared_task()
def fetch_article(job_id, url):
logger.info(u'fetch_article:%s' % url)
Processer = get_processor_cls(url)
a = Article(job_id=job_id, url=url)
try:
ap = Processer(url)
title, text = ap.process()
a.title = title
a.content = text
except Exception as e:
a.status = 2
a.error = e
logger.error(u'fetch_article:%s error:%s' % (url, e))
a.save()
好的,我找到问题了。
因为我在设置里面设置了CELERY_ALWAYS_EAGER = True
。
django 主进程中的任务 运行,因此 worker 没有操作
来自文档:
CELERY_ALWAYS_EAGER If this is True, all tasks will be executed
locally by blocking until the task returns. apply_async() and
Task.delay() will return an EagerResult instance, which emulates the
API and behavior of AsyncResult, except the result is already
evaluated.
That is, tasks will be executed locally instead of being sent to the
queue.
工人第一次工作,我还是一头雾水,可能是之前的工作队列中有一些url。
完整步骤:
启动django
启动一个 celery worker
python manage.py 芹菜工人 --app=celery_worker:app -Ofair -n W1
上传一个url列表文件,循环url列表发送每个url到一个任务
fetch_article
工人工作
上传另一个url列表文件
工人无操作
views.py:
@csrf_exempt
def upload(request):
job_name = request.POST.get('job_name')
if not job_name:
return JsonResponse(JsonStatus.Error)
if len(request.FILES) == 1:
yq_data = request.FILES.values()[0]
else:
return JsonResponse(JsonStatus.Error)
job = Job.objects.create(name=job_name)
reader = csv.reader(yq_data, delimiter=',')
task_count = 0
next(reader)
for row in reader:
url = row[0].strip()
fetch_article.delay(job.id, url)
# fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
task_count += 1
# print 'qn%s' % job.queue_name
# rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
# print rp
job.task_count = task_count
job.save()
return JsonResponse(JsonStatus.OK, msg=task_count)
tasks.py
@shared_task()
def fetch_article(job_id, url):
logger.info(u'fetch_article:%s' % url)
Processer = get_processor_cls(url)
a = Article(job_id=job_id, url=url)
try:
ap = Processer(url)
title, text = ap.process()
a.title = title
a.content = text
except Exception as e:
a.status = 2
a.error = e
logger.error(u'fetch_article:%s error:%s' % (url, e))
a.save()
好的,我找到问题了。
因为我在设置里面设置了CELERY_ALWAYS_EAGER = True
。
django 主进程中的任务 运行,因此 worker 没有操作
来自文档:
CELERY_ALWAYS_EAGER If this is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, which emulates the API and behavior of AsyncResult, except the result is already evaluated.
That is, tasks will be executed locally instead of being sent to the queue.
工人第一次工作,我还是一头雾水,可能是之前的工作队列中有一些url。