龙卷风测试停止测试用例 io_loop
Tornado testing stop testcase io_loop
我在 class
的某些方法中有永远的 while 循环
@gen.coroutine
def process_task_queue(self):
while True:
#print ("Hello from async while")
if not self.q:
continue
#just example
据我了解 "while" 是非阻塞相对龙卷风 io_loop。
比我用 tornado.testing
创建单元测试
class TestMSGB(AsyncTestCase):
#@gen_test
def test_init(self):
start = time.time()
def done():
self.io_loop.stop()
print "Done"
duration = time.time() - start
#print "While worked for {} seconds".format(duration)
print "+++++++++ Test init +++++++++ "
s1 = Stream(iol=self.io_loop)
self.io_loop.current().spawn_callback(s1.process_task_queue)
self.io_loop.call_later(time.time() + 1, done) # try this way
self.io_loop.handle_callback_exception(done) # also try this way
self.io_loop.start()
正如我所料,它必须在1秒后停止,但它继续在while中。那么我如何做一个异步 while 循环,它在龙卷风 io_loop 停止时停止并且可以与 while class?
的其他实例并行工作
更新:
修改代码但测试也被阻止:
@gen.coroutine
def process_task_queue(self):
print ("Hello from async while")
while True:
item = yield self.q.get()
print ("while iter... item is {}".format(item))
try:
print('Doing work on %s' % item)
yield gen.sleep(0.1)
finally:
q.task_done()
和测试用例:
@gen_test
def test_init(self):
start = time.time()
print "+++++++++ Test init +++++++++ "
s1 = Stream(iol=self.io_loop)
def producer():
for item in range(5):
yield s1.q.put(item)
print('Put %s' % item)
# as in example
self.io_loop.current().spawn_callback(s1.process_task_queue)
yield producer()
yield s1.q.join()
您的 process_task_queue
函数正在阻止 IOLoop
,因为它不包含任何 yield
语句。它只会在 while True
和 continue
之间永远循环。这里必须使用异步队列,等待任务可用时使用yield
:http://www.tornadoweb.org/en/stable/queues.html#tornado.queues.Queue
我在 class
的某些方法中有永远的 while 循环@gen.coroutine
def process_task_queue(self):
while True:
#print ("Hello from async while")
if not self.q:
continue
#just example
据我了解 "while" 是非阻塞相对龙卷风 io_loop。
比我用 tornado.testing
创建单元测试class TestMSGB(AsyncTestCase):
#@gen_test
def test_init(self):
start = time.time()
def done():
self.io_loop.stop()
print "Done"
duration = time.time() - start
#print "While worked for {} seconds".format(duration)
print "+++++++++ Test init +++++++++ "
s1 = Stream(iol=self.io_loop)
self.io_loop.current().spawn_callback(s1.process_task_queue)
self.io_loop.call_later(time.time() + 1, done) # try this way
self.io_loop.handle_callback_exception(done) # also try this way
self.io_loop.start()
正如我所料,它必须在1秒后停止,但它继续在while中。那么我如何做一个异步 while 循环,它在龙卷风 io_loop 停止时停止并且可以与 while class?
的其他实例并行工作更新: 修改代码但测试也被阻止:
@gen.coroutine
def process_task_queue(self):
print ("Hello from async while")
while True:
item = yield self.q.get()
print ("while iter... item is {}".format(item))
try:
print('Doing work on %s' % item)
yield gen.sleep(0.1)
finally:
q.task_done()
和测试用例:
@gen_test
def test_init(self):
start = time.time()
print "+++++++++ Test init +++++++++ "
s1 = Stream(iol=self.io_loop)
def producer():
for item in range(5):
yield s1.q.put(item)
print('Put %s' % item)
# as in example
self.io_loop.current().spawn_callback(s1.process_task_queue)
yield producer()
yield s1.q.join()
您的 process_task_queue
函数正在阻止 IOLoop
,因为它不包含任何 yield
语句。它只会在 while True
和 continue
之间永远循环。这里必须使用异步队列,等待任务可用时使用yield
:http://www.tornadoweb.org/en/stable/queues.html#tornado.queues.Queue