到 Tornado 服务器的多个异步 HTTP 连接
Multiple Async HTTP connections to Tornado Server
我有一个龙卷风服务器,我正试图使其同步。我有一个同时向服务器发出异步请求的客户端。它每 5 秒通过心跳对服务器执行一次 ping 操作,其次,它会尽可能地为作业发出 GET 请求。
在服务器端,有一个包含作业的线程安全队列。如果队列为空,它将阻塞 20 秒。我希望它保持连接并阻塞那 20 秒,当它 return 时,它将 "No job" 写入客户端。一旦作业可用,它应该立即将其写入客户端,因为 queue.get() 会 return。当此请求被阻止时,我希望心跳继续在后台发生。这里我从同一个客户端向服务器发出两个异步请求。
这是我构建的一个模拟我的问题的示例项目。
服务器:
import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen
q = Queue()
class HeartBeatHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
print("Heart beat")
class JobHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
print("Job")
try:
job = yield q.get(block=True, timeout=20)
self.write(job)
except Exception as e:
self.write("No job")
def make_app():
return tornado.web.Application([
(r"/heartbeat", HeartBeatHandler),
(r"/job", JobHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
try:
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()
客户:
import asyncio
from tornado import httpclient, gen
@gen.coroutine
def heartbeat_routine():
while True:
http_client = httpclient.AsyncHTTPClient()
heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
body="")
try:
yield http_client.fetch(heartbeat_request)
yield asyncio.sleep(5)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
@gen.coroutine
def worker_routine():
while True:
http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
try:
response = yield http_client.fetch(job_request)
print(response.body)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(heartbeat_routine())
asyncio.ensure_future(worker_routine())
loop.run_forever()
问题:
- 问题是心跳也阻塞了那 20 秒
而 queue.get() 块。我不想要。
- 正如您在我的客户端中看到的那样,我将请求超时设置为 180 秒。但是那个
似乎从来没有与龙卷风一起工作。如果将 queue.get() 超时增加到 20 以上
秒,它 returns 错误代码说请求超时。
如果您使用线程安全队列,则必须使用不使用来自 IOLoop 线程的阻塞操作。相反,运行 它们在线程池中:
job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
或者,您可以使用 Tornado 的异步(但线程不安全)队列,并在需要与来自另一个线程的队列交互时使用 IOLoop.add_callback
。
AsyncHTTPClient
构造函数中有一些神奇之处,它会尽可能尝试共享现有实例,但这意味着构造函数参数仅在第一次有效。 worker_routine
正在选取由 heartbeat_routine
创建的默认实例。添加 force_instance=True
以确保您在 worker_routine
中获得新客户(并在完成后调用 .close()
)
我有一个龙卷风服务器,我正试图使其同步。我有一个同时向服务器发出异步请求的客户端。它每 5 秒通过心跳对服务器执行一次 ping 操作,其次,它会尽可能地为作业发出 GET 请求。
在服务器端,有一个包含作业的线程安全队列。如果队列为空,它将阻塞 20 秒。我希望它保持连接并阻塞那 20 秒,当它 return 时,它将 "No job" 写入客户端。一旦作业可用,它应该立即将其写入客户端,因为 queue.get() 会 return。当此请求被阻止时,我希望心跳继续在后台发生。这里我从同一个客户端向服务器发出两个异步请求。
这是我构建的一个模拟我的问题的示例项目。
服务器:
import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen
q = Queue()
class HeartBeatHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
print("Heart beat")
class JobHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
print("Job")
try:
job = yield q.get(block=True, timeout=20)
self.write(job)
except Exception as e:
self.write("No job")
def make_app():
return tornado.web.Application([
(r"/heartbeat", HeartBeatHandler),
(r"/job", JobHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
try:
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()
客户:
import asyncio
from tornado import httpclient, gen
@gen.coroutine
def heartbeat_routine():
while True:
http_client = httpclient.AsyncHTTPClient()
heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
body="")
try:
yield http_client.fetch(heartbeat_request)
yield asyncio.sleep(5)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
@gen.coroutine
def worker_routine():
while True:
http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
try:
response = yield http_client.fetch(job_request)
print(response.body)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(heartbeat_routine())
asyncio.ensure_future(worker_routine())
loop.run_forever()
问题:
- 问题是心跳也阻塞了那 20 秒 而 queue.get() 块。我不想要。
- 正如您在我的客户端中看到的那样,我将请求超时设置为 180 秒。但是那个 似乎从来没有与龙卷风一起工作。如果将 queue.get() 超时增加到 20 以上 秒,它 returns 错误代码说请求超时。
如果您使用线程安全队列,则必须使用不使用来自 IOLoop 线程的阻塞操作。相反,运行 它们在线程池中:
job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
或者,您可以使用 Tornado 的异步(但线程不安全)队列,并在需要与来自另一个线程的队列交互时使用
IOLoop.add_callback
。AsyncHTTPClient
构造函数中有一些神奇之处,它会尽可能尝试共享现有实例,但这意味着构造函数参数仅在第一次有效。worker_routine
正在选取由heartbeat_routine
创建的默认实例。添加force_instance=True
以确保您在worker_routine
中获得新客户(并在完成后调用.close()
)