芹菜工人只工作一次

celery worker only work once

完整步骤:

  1. 启动django

  2. 启动一个 celery worker

    python manage.py 芹菜工人 --app=celery_worker:app -Ofair -n W1

  3. 上传一个url列表文件,循环url列表发送每个url到一个任务fetch_article

  4. 工人工作

  5. 上传另一个url列表文件

  6. 工人无操作

views.py:

@csrf_exempt
def upload(request):

    job_name = request.POST.get('job_name')
    if not job_name:
        return JsonResponse(JsonStatus.Error)

    if len(request.FILES) == 1:
        yq_data = request.FILES.values()[0]
    else:
        return JsonResponse(JsonStatus.Error)

    job = Job.objects.create(name=job_name)

    reader = csv.reader(yq_data, delimiter=',')

    task_count = 0

    next(reader)
    for row in reader:
        url = row[0].strip()
        fetch_article.delay(job.id, url)
        # fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
        task_count += 1


    # print 'qn%s' % job.queue_name
    # rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
    # print rp

    job.task_count = task_count
    job.save()

    return JsonResponse(JsonStatus.OK, msg=task_count)

tasks.py

@shared_task()
def fetch_article(job_id, url):

    logger.info(u'fetch_article:%s' % url)

    Processer = get_processor_cls(url)

    a = Article(job_id=job_id, url=url)
    try:
        ap = Processer(url)
        title, text = ap.process()
        a.title = title
        a.content = text

    except Exception as e:
        a.status = 2
        a.error = e
        logger.error(u'fetch_article:%s error:%s' % (url, e))

    a.save()

好的,我找到问题了。

因为我在设置里面设置了CELERY_ALWAYS_EAGER = True。 django 主进程中的任务 运行,因此 worker 没有操作

来自文档:

CELERY_ALWAYS_EAGER If this is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, which emulates the API and behavior of AsyncResult, except the result is already evaluated.

That is, tasks will be executed locally instead of being sent to the queue.

工人第一次工作,我还是一头雾水,可能是之前的工作队列中有一些url。