在 Tornado 中创建处理队列
Creating a processing queue in Tornado
我正在使用 Tornado 网络服务器排队需要在 request/response 周期之外处理的项目。
在我下面的简化示例中,每次收到请求时,我都会将一个新字符串添加到名为 queued_items
的列表中。我想创建一些东西来监视该列表并处理其中显示的项目。
(在我的真实代码中,项目是通过 TCP 套接字处理和发送的,当网络请求到达时,该套接字可能连接也可能不连接。我希望网络服务器继续排队,而不管套接字连接如何)
我试图让这段代码保持简单并且不使用像 Redis 或 Beanstalk 这样的外部 queues/programs。它不会有很高的音量。
使用 Tornado 习语观察 client.queued_items
列表中的新项目并在它们到达时处理它们的好方法是什么?
import time
import tornado.ioloop
import tornado.gen
import tornado.web
class Client():
def __init__(self):
self.queued_items = []
@tornado.gen.coroutine
def watch_queue(self):
# I have no idea what I'm doing
items = yield client.queued_items
# go_do_some_thing_with_items(items)
class IndexHandler(tornado.web.RequestHandler):
def get(self):
client.queued_items.append("%f" % time.time())
self.write("Queued a new item")
if __name__ == "__main__":
client = Client()
# Watch the queue for when new items show up
client.watch_queue()
# Create the web server
application = tornado.web.Application([
(r'/', IndexHandler),
], debug=True)
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
有一个名为 toro
的库,它为 tornado
提供同步原语。 [更新:从 tornado 4.2 开始,toro
已合并到 tornado
。]
听起来你可以使用 toro.Queue
(或 tornado
4.2+ 中的 tornado.queues.Queue
)来处理这个问题:
import time
import toro
import tornado.ioloop
import tornado.gen
import tornado.web
class Client():
def __init__(self):
self.queued_items = toro.Queue()
@tornado.gen.coroutine
def watch_queue(self):
while True:
items = yield self.queued_items.get()
# go_do_something_with_items(items)
class IndexHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
yield client.queued_items.put("%f" % time.time())
self.write("Queued a new item")
if __name__ == "__main__":
client = Client()
# Watch the queue for when new items show up
tornado.ioloop.IOLoop.current().add_callback(client.watch_queue)
# Create the web server
application = tornado.web.Application([
(r'/', IndexHandler),
], debug=True)
application.listen(8888)
tornado.ioloop.IOLoop.current().start()
除了将数据结构从列表切换为 toro.Queue
:
之外,还需要进行一些调整
- 我们需要使用
add_callback
在 IOLoop 内部将 watch_queue
调度到 运行,而不是尝试在 IOLoop 上下文之外直接调用它。
IndexHandler.get
需要转成协程,因为toro.Queue.put
是协程。
我还在 watch_queue
中添加了一个 while True
循环,这样它将永远 运行,而不是只处理一个项目然后退出。
我正在使用 Tornado 网络服务器排队需要在 request/response 周期之外处理的项目。
在我下面的简化示例中,每次收到请求时,我都会将一个新字符串添加到名为 queued_items
的列表中。我想创建一些东西来监视该列表并处理其中显示的项目。
(在我的真实代码中,项目是通过 TCP 套接字处理和发送的,当网络请求到达时,该套接字可能连接也可能不连接。我希望网络服务器继续排队,而不管套接字连接如何)
我试图让这段代码保持简单并且不使用像 Redis 或 Beanstalk 这样的外部 queues/programs。它不会有很高的音量。
使用 Tornado 习语观察 client.queued_items
列表中的新项目并在它们到达时处理它们的好方法是什么?
import time
import tornado.ioloop
import tornado.gen
import tornado.web
class Client():
def __init__(self):
self.queued_items = []
@tornado.gen.coroutine
def watch_queue(self):
# I have no idea what I'm doing
items = yield client.queued_items
# go_do_some_thing_with_items(items)
class IndexHandler(tornado.web.RequestHandler):
def get(self):
client.queued_items.append("%f" % time.time())
self.write("Queued a new item")
if __name__ == "__main__":
client = Client()
# Watch the queue for when new items show up
client.watch_queue()
# Create the web server
application = tornado.web.Application([
(r'/', IndexHandler),
], debug=True)
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
有一个名为 toro
的库,它为 tornado
提供同步原语。 [更新:从 tornado 4.2 开始,toro
已合并到 tornado
。]
听起来你可以使用 toro.Queue
(或 tornado
4.2+ 中的 tornado.queues.Queue
)来处理这个问题:
import time
import toro
import tornado.ioloop
import tornado.gen
import tornado.web
class Client():
def __init__(self):
self.queued_items = toro.Queue()
@tornado.gen.coroutine
def watch_queue(self):
while True:
items = yield self.queued_items.get()
# go_do_something_with_items(items)
class IndexHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
yield client.queued_items.put("%f" % time.time())
self.write("Queued a new item")
if __name__ == "__main__":
client = Client()
# Watch the queue for when new items show up
tornado.ioloop.IOLoop.current().add_callback(client.watch_queue)
# Create the web server
application = tornado.web.Application([
(r'/', IndexHandler),
], debug=True)
application.listen(8888)
tornado.ioloop.IOLoop.current().start()
除了将数据结构从列表切换为 toro.Queue
:
- 我们需要使用
add_callback
在 IOLoop 内部将watch_queue
调度到 运行,而不是尝试在 IOLoop 上下文之外直接调用它。 IndexHandler.get
需要转成协程,因为toro.Queue.put
是协程。
我还在 watch_queue
中添加了一个 while True
循环,这样它将永远 运行,而不是只处理一个项目然后退出。