为什么 Celery 任务测试结果不一致?

Why are Celery task test result inconsistent?

我已经为两个 Celery 任务编写了两个简单的集成测试,但是 当我 运行 他们得到不一致的结果。我可以 运行 他们一分钟 一个或两个都会通过,然后 运行 他们几秒钟又一 或者两者都会失败。为什么这些结果与一个不一致 测试运行下一个?另外,这些测试实际上是在测试 Celery 任务是否被发送到队列并由工作人员执行?

谢谢!

以下是任务:

# apps/photos/tasks.py
from __future__ import absolute_import
from conf.celeryapp import app

@app.task
def hello():
    return 'Hello world!'

@app.task
def add(x, y):
    return x + y

测试如下:

# apps/photos/tests/task_tests.py
from django.test import TestCase
from django.test.utils import override_settings
from apps.photos.tasks import hello, add

class TaskTestIT(TestCase):
    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')

    def test_hello(self):
        result = hello.delay()
        self.assertTrue(result.successful())
        self.assertEqual(str(result.result), 'Hello world!')

    def test_add(self):
        result = add.delay(1, 1)
        self.assertTrue(result.successful())
        self.assertEquals(result.get(), 2)

我 运行 我用这个命令测试:

./manage.py test -s

我使用 django-nose 作为我的测试 运行ner:

# conf/settings/base.py
USE_DJANGO_NOSE = True
if USE_DJANGO_NOSE:
    INSTALLED_APPS += ( 'django_nose', )
    TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'

这是我的 Celery 应用程序和配置文件:

# conf/celeryapp.py
from celery import Celery

app = Celery('celeryapp')
app.config_from_object('conf.celeryconfig')
app.autodiscover_tasks(['apps.photos'])

# conf/celeryconfig.py
from kombu import Queue, Exchange

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('photos', Exchange('photos'), routing_key='photos'),
    Queue('mail', Exchange('mail'), routing_key='mail'),
)
CELERY_ROUTES = (
    {'apps.photos.tasks.hello': {'queue': 'default', 'routing_key': 'default'}},
    {'apps.photos.tasks.add': {'queue': 'photos', 'routing_key': 'photos'}},
    {'apps.photos.tasks.send_email': {'queue': 'mail', 'routing_key': 'mail'}},
)

Task.delay() doesn't return the actual result of the task, it returns an AsyncResult 对象,它将包含任务执行时的结果。您的不同结果是由于有时任务执行速度比您的测试开始检查其结果更快,有时它需要更长的时间。这取决于你的系统负载等

你应该做的是先调用result.get()等待任务完成执行,然后你才应该检查它是否是.successful()等等

例如,以下应该会产生一致的结果:

def test_hello(self):
        result = hello.delay()
        result.get()
        self.assertTrue(result.successful())
        self.assertEqual(str(result.result), 'Hello world!')