为什么在 Tornado RequestHandler 中调用 __init__ 的异步消费者的行为与静态调用的行为不同?
Why does an async consumer called in __init__ in a Tornado RequestHandler behave differently from statically called?
我正在尝试使用 Tornado 创建一个异步服务器,每个处理程序都有一个唯一的队列。当端点被调用时,作业被放入队列中。我有一个消费者函数,它从队列中异步 'consumes' 作业。但是,消费者的行为往往会根据我将其称为 self.consumer()
还是 AsyncHandler.consumer()
而有所不同。我最初的猜测是这是因为实例级锁定但找不到证据。我连续发出 4 post 个请求。这是 2 个片段及其输出。
import tornado.web
from tornado import gen
from time import sleep, time
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
class AsyncHandler(tornado.web.RequestHandler):
JOB_QUEUE = Queue()
EXECUTOR = ThreadPoolExecutor()
def post(self):
job = lambda: sleep(3) or print("{}:handler called".format(int(time())))
self.JOB_QUEUE.put(job)
self.set_status(200)
self.finish()
@staticmethod
@gen.coroutine
def consumer():
while True:
job = yield AsyncHandler.JOB_QUEUE.get()
print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize()))
print(AsyncHandler.JOB_QUEUE)
output = yield AsyncHandler.EXECUTOR.submit(job)
AsyncHandler.JOB_QUEUE.task_done()
if __name__ == "__main__":
AsyncHandler.consumer()
APP = tornado.web.Application([(r"/test", AsyncHandler)])
APP.listen(9000)
IOLoop.current().start()
这给出了预期的输出:
qsize : 0
<Queue maxsize=0 tasks=1>
1508618429:handler called
qsize : 2
<Queue maxsize=0 queue=deque([<function...<lambda> at 0x7fbf8f741400>, <function... <lambda> at 0x7fbf8f760ea0>]) tasks=3>
1508618432:handler called
qsize : 1
<Queue maxsize=0 queue=deque([<function AsyncHandler.post.<locals>.<lambda> at 0x7fbf8f760ea0>]) tasks=2>
1508618435:handler called
qsize : 0
<Queue maxsize=0 tasks=1>
1508618438:handler called
output = yield AsyncHandler.EXECUTOR.submit(job)
需要 3 秒才能 return 输出,因此输出延迟 3 秒到达。同时我们还可以看到队列在增加。
现在来看一段有趣的代码:
import tornado.web
from tornado import gen
from time import sleep, time
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
class AsyncHandler(tornado.web.RequestHandler):
JOB_QUEUE = Queue()
EXECUTOR = ThreadPoolExecutor()
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.consumer()
def post(self):
job = lambda: sleep(3) or print("{}:handler called".format(int(time())))
self.JOB_QUEUE.put(job)
self.set_status(200)
self.finish()
@staticmethod
@gen.coroutine
def consumer():
while True:
job = yield AsyncHandler.JOB_QUEUE.get()
print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize()))
print(AsyncHandler.JOB_QUEUE)
output = yield AsyncHandler.EXECUTOR.submit(job)
AsyncHandler.JOB_QUEUE.task_done()
if __name__ == "__main__":
APP = tornado.web.Application([(r"/test", AsyncHandler)])
APP.listen(9000)
IOLoop.current().start()
奇怪(而且令人愉快)的输出看起来像:
qsize : 0
<Queue maxsize=0 tasks=1>
qsize : 0
<Queue maxsize=0 tasks=2>
qsize : 0
<Queue maxsize=0 tasks=3>
qsize : 0
<Queue maxsize=0 tasks=4>
1508619138:handler called
1508619138:handler called
1508619139:handler called
1508619139:handler called
请注意,现在我们在 __init__
中调用消费者。我们可以看到任务建立并并行执行(没有建立队列),几乎同时完成。就好像 output = yield AsyncHandler.EXECUTOR.submit(job)
没有阻塞未来。即使经过大量实验,我也无法解释这种行为。非常感谢您的帮助。
第一个应用程序只有一个运行 consumer
因为它执行了一次。每个请求"blocks"(一次只有一个是循环)消费者,所以下一个将在前一个之后处理。
后一个应用会针对每个请求启动一个新的 consumer
循环(因为 RequestHandler 是根据请求创建的)。所以第一个请求不会 "block" 下一个,因为有全新的 while True
和 get
和 submit
...
我正在尝试使用 Tornado 创建一个异步服务器,每个处理程序都有一个唯一的队列。当端点被调用时,作业被放入队列中。我有一个消费者函数,它从队列中异步 'consumes' 作业。但是,消费者的行为往往会根据我将其称为 self.consumer()
还是 AsyncHandler.consumer()
而有所不同。我最初的猜测是这是因为实例级锁定但找不到证据。我连续发出 4 post 个请求。这是 2 个片段及其输出。
import tornado.web
from tornado import gen
from time import sleep, time
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
class AsyncHandler(tornado.web.RequestHandler):
JOB_QUEUE = Queue()
EXECUTOR = ThreadPoolExecutor()
def post(self):
job = lambda: sleep(3) or print("{}:handler called".format(int(time())))
self.JOB_QUEUE.put(job)
self.set_status(200)
self.finish()
@staticmethod
@gen.coroutine
def consumer():
while True:
job = yield AsyncHandler.JOB_QUEUE.get()
print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize()))
print(AsyncHandler.JOB_QUEUE)
output = yield AsyncHandler.EXECUTOR.submit(job)
AsyncHandler.JOB_QUEUE.task_done()
if __name__ == "__main__":
AsyncHandler.consumer()
APP = tornado.web.Application([(r"/test", AsyncHandler)])
APP.listen(9000)
IOLoop.current().start()
这给出了预期的输出:
qsize : 0
<Queue maxsize=0 tasks=1>
1508618429:handler called
qsize : 2
<Queue maxsize=0 queue=deque([<function...<lambda> at 0x7fbf8f741400>, <function... <lambda> at 0x7fbf8f760ea0>]) tasks=3>
1508618432:handler called
qsize : 1
<Queue maxsize=0 queue=deque([<function AsyncHandler.post.<locals>.<lambda> at 0x7fbf8f760ea0>]) tasks=2>
1508618435:handler called
qsize : 0
<Queue maxsize=0 tasks=1>
1508618438:handler called
output = yield AsyncHandler.EXECUTOR.submit(job)
需要 3 秒才能 return 输出,因此输出延迟 3 秒到达。同时我们还可以看到队列在增加。
现在来看一段有趣的代码:
import tornado.web
from tornado import gen
from time import sleep, time
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
class AsyncHandler(tornado.web.RequestHandler):
JOB_QUEUE = Queue()
EXECUTOR = ThreadPoolExecutor()
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.consumer()
def post(self):
job = lambda: sleep(3) or print("{}:handler called".format(int(time())))
self.JOB_QUEUE.put(job)
self.set_status(200)
self.finish()
@staticmethod
@gen.coroutine
def consumer():
while True:
job = yield AsyncHandler.JOB_QUEUE.get()
print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize()))
print(AsyncHandler.JOB_QUEUE)
output = yield AsyncHandler.EXECUTOR.submit(job)
AsyncHandler.JOB_QUEUE.task_done()
if __name__ == "__main__":
APP = tornado.web.Application([(r"/test", AsyncHandler)])
APP.listen(9000)
IOLoop.current().start()
奇怪(而且令人愉快)的输出看起来像:
qsize : 0
<Queue maxsize=0 tasks=1>
qsize : 0
<Queue maxsize=0 tasks=2>
qsize : 0
<Queue maxsize=0 tasks=3>
qsize : 0
<Queue maxsize=0 tasks=4>
1508619138:handler called
1508619138:handler called
1508619139:handler called
1508619139:handler called
请注意,现在我们在 __init__
中调用消费者。我们可以看到任务建立并并行执行(没有建立队列),几乎同时完成。就好像 output = yield AsyncHandler.EXECUTOR.submit(job)
没有阻塞未来。即使经过大量实验,我也无法解释这种行为。非常感谢您的帮助。
第一个应用程序只有一个运行 consumer
因为它执行了一次。每个请求"blocks"(一次只有一个是循环)消费者,所以下一个将在前一个之后处理。
后一个应用会针对每个请求启动一个新的 consumer
循环(因为 RequestHandler 是根据请求创建的)。所以第一个请求不会 "block" 下一个,因为有全新的 while True
和 get
和 submit
...