使用龙卷风 AsyncHTTPTestCase 连接超时测试

tests using tornado AsyncHTTPTestCase connection timeout

我在单元测试 Tornado 应用程序时遇到问题,请帮助我。错误堆栈跟踪:

Error Traceback (most recent call last): File "/Users/doc/python/lib/python3.5/site-packages/tornado/testing.py", line 432, in tearDown timeout=get_async_test_timeout()) File "/Users/doc/python-virt1/lib/python3.5/site-packages/tornado/ioloop.py", line 456, in run_sync raise TimeoutError('Operation timed out after %s seconds' % timeout) tornado.ioloop.TimeoutError: Operation timed out after 5 seconds

ERROR:tornado.application:Future exception was never retrieved: Traceback (most recent call last): File "/Users/doc/python/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run yielded = self.gen.throw(*exc_info) File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/types.py", line 179, in throw return self.__wrapped.throw(tp, *rest) File "/Users/doc/python-virt1/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run value = future.result() File "/Users/doc/python-virt1/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.curl_httpclient.CurlError: HTTP 599: Empty reply from server Traceback (most recent call last):

test.py 文件:

from tornado.testing import gen_test
from tests.api_tests.base import AbstractApplicationTestBase


class ApiRestTest(AbstractApplicationTestBase):
    def setUp(self):
        super(ApiRestTest, self).setUp()
        self.prepareDatabase(self.config)
        self.insert_user(config=self.config)

api_test/base.py

import logging
from api import server
from commons.constants import config
from tests.base import BaseTestClass


class AbstractApplicationTestBase(BaseTestClass):
    def get_app(self):
        application = server.get_application(self.config)

        application.settings[config.APPLICATION_DB] = self.db
        application.settings[config.APPLICATION_CONFIG] = self.config
        application.settings[config.APPLICATION_AES] = self.aes
        application.settings[config.APPLICATION_FS] = self.fs
        logging.info(self.config.DEPLOY_API)

        return application

test/base.py

import logging
from datetime import datetime
import motor.motor_tornado
from motor import MotorGridFSBucket
from pymongo import MongoClient
from tornado import escape
from tornado import gen
from tornado.testing import AsyncHTTPTestCase

class BaseTestClass(AsyncHTTPTestCase):
     @classmethod
     def setUpClass(self):
         super(BaseTestClass, self).setUpClass()
         self.config = Config(Environment.TESTS.value)
         self.client = utils.http_client(self.config.PROXY_HOST, self.config.PROXY_PORT)
         self.db = motor.motor_tornado.MotorClient(self.config.MONGODB_URI)[self.config.MONGODB_NAME]
         self.fs = MotorGridFSBucket(self.db)

AsyncHTTPTestCase 在每次测试开始时创建一个新的 IOLoop,并在每次测试结束时销毁它。但是,您在整个测试开始时创建了一个 MotorClient class,并使用默认的全局 IOLoop 而不是专门为每个测试创建的 IOLoop。

我相信您只需将 setUpClass 替换为 setUp 即可。然后,您将在 AsyncHTTPTestCase 设置其 IOLoop 后创建您的 MotorClient。为清楚起见,显式传递 IOLoop:

client = MotorClient(io_loop=self.io_loop)
self.db = client[self.config.MONGODB_NAME]

我注意到的一些事情。

  • 我看到的主要问题是你在你的 setUp 方法中通过电机做一些 IO,而 setUp 不能是 gen_test (AFAIK)。如果您需要这种类型的功能,您可能需要下拉到 pymongo 并同步调用数据库以存根这些数据库装置。
  • 您是故意 运行 反对真实数据库吗?这些应该是 mongodb 的真正集成测试吗?当我编写这些类型的测试时,我通常会使用 Mock class 并模拟我与 MongoDb.
  • 的所有交互
  • 此外,创建 AsyncHttpClient 对象是免费的,因此将其从 settings/config 对象传递到每个处理程序可能不是最佳做法。

这是我的一个项目中的示例处理程序测试:

example_fixture = [{'foo': 'bar'}]
URL = r'/list'

    class BaseListHandlerTests(BaseHandlerTestCase):
        """
        Test the abstract list handler
        """
        def setUp(self):
            self.mongo_client = Mock()
            self.fixture = deepcopy(example_fixture)
            # Must be run last
            BaseHandlerTestCase.setUp(self)

        def get_app(self):
            return Application([
                (URL, BaseListHandler,
                 dict(mongo_client=self.mongo_client))
            ], **settings)

        def test_get_list_of_objects_returns_200_with_results(self):
            self.mongo_client.find.return_value = self.get_future(example_fixture)
            response = self.fetch('{}'.format(URL))
            response_json = self.to_json(response)
            self.assertListEqual(response_json.get('results'), example_fixture)
            self.assertEqual(response.code, 200)

        def test_get_list_of_objects_returns_200_with_no_results(self):
            self.mongo_client.find.return_value = self.get_future([])
            response = self.fetch('{}'.format(URL))
            self.assertEqual(response.code, 200)

        def test_get_list_of_objects_returns_500_with_exception(self):
            self.mongo_client.find.return_value = self.get_future_with_exception(Exception('FAILED!'))
            response = self.fetch('{}'.format(URL))
            self.assertEqual(response.code, 500)

完成这项工作的关键是我的 mongo_client 被传递到路由对象本身。所以我的处理程序初始化需要一个 mongo_client kwarg.

class BaseListHandler(BaseHandler):
    """
    Base list handler
    """
    mongo_client = None

    def initialize(self, mongo_client=None):
        """
        Rest Client Initialize
        Args:
            mongo_client: The client used to access documents for this handler

        Returns:

        """
        BaseHandler.initialize(self)
        self.mongo_client = mongo_client