Tornado gen 协程中的 Tornado 异步作业
Tornado asynchronous job in tornado gen coroutine
我编写了一个从队列中获取作业并异步执行的应用程序。
def job(self):
print 'In job'
time.sleep(0.01)
@gen.coroutine
def start_jobs(self):
jobs = filter(lambda x: x['status'] == 0, self.queue)
for job in jobs:
yield self.job()
print 'exit from start job'
但是,此代码不起作用。
输出:
在职
在职
在工作等
如何正确操作?
我如何让它与 Futures 一起工作,有没有更简单的方法来与 Tornado 一起工作?
切勿在 Tornado 中调用 time.sleep
!请改用 yield gen.sleep
。
使用 pip install toro
安装 Toro 并使用 JoinableQueue:
import random
from tornado import ioloop, gen
import toro
class C(object):
def __init__(self):
self.queue = toro.JoinableQueue()
@gen.coroutine
def start_jobs(self):
while True:
job_id = yield self.queue.get()
self.job(job_id)
@gen.coroutine
def job(self, job_id):
print 'job_id', job_id
yield gen.sleep(random.random())
print 'job_id', job_id, 'done'
self.queue.task_done()
c = C()
for i in range(5):
c.queue.put_nowait(i)
c.start_jobs()
io_loop = ioloop.IOLoop.instance()
# block until all tasks are done
c.queue.join().add_done_callback(lambda future: io_loop.stop())
io_loop.start()
从 Tornado 4.2 开始,Toro 是 Tornado 的一部分,因此您可以 queue = tornado.queues.Queue()
而不是使用 Toro JoinableQueue:
我编写了一个从队列中获取作业并异步执行的应用程序。
def job(self):
print 'In job'
time.sleep(0.01)
@gen.coroutine
def start_jobs(self):
jobs = filter(lambda x: x['status'] == 0, self.queue)
for job in jobs:
yield self.job()
print 'exit from start job'
但是,此代码不起作用。
输出:
在职
在职
在工作等
如何正确操作?
我如何让它与 Futures 一起工作,有没有更简单的方法来与 Tornado 一起工作?
切勿在 Tornado 中调用 time.sleep
!请改用 yield gen.sleep
。
使用 pip install toro
安装 Toro 并使用 JoinableQueue:
import random
from tornado import ioloop, gen
import toro
class C(object):
def __init__(self):
self.queue = toro.JoinableQueue()
@gen.coroutine
def start_jobs(self):
while True:
job_id = yield self.queue.get()
self.job(job_id)
@gen.coroutine
def job(self, job_id):
print 'job_id', job_id
yield gen.sleep(random.random())
print 'job_id', job_id, 'done'
self.queue.task_done()
c = C()
for i in range(5):
c.queue.put_nowait(i)
c.start_jobs()
io_loop = ioloop.IOLoop.instance()
# block until all tasks are done
c.queue.join().add_done_callback(lambda future: io_loop.stop())
io_loop.start()
从 Tornado 4.2 开始,Toro 是 Tornado 的一部分,因此您可以 queue = tornado.queues.Queue()
而不是使用 Toro JoinableQueue: