Python Tornado:从非协程中消耗外部队列
Python Tornado: consuming external Queue from not coroutine
我有以下情况:使用 python 3.6 和 Tornado 5.1 通过网络套接字接收客户端请求。其中一些请求需要您调用外部处理,该处理 returns 一个队列,然后定期将结果存入其中。这些结果通过 websocket 传输到客户端。
外部处理不是协同程序,因此我使用 run_in_executor 调用它。
我的问题:
当外部处理的响应时间很大时,run_in_executor达到worker最大数量(default: number of processors x 5)!
增加工人的最大数量安全吗?
或者推荐其他解决方案? !!
下面是简化代码。
从已经非常感谢你了!!!!
#########################
## SERVER CODE ##
#########################
from random import randint
import tornado.httpserver
import tornado.websocket
import tornado.ioloop
import tornado.web
from random import randint
from tornado import gen
import threading
import asyncio
import queue
import time
class WSHandler(tornado.websocket.WebSocketHandler):
"""entry point for all WS request"""
def open(self):
print('new connection. Request: ' + str(self.request))
async def on_message(self, message):
# Emulates the subscription to an external object
# that returns a queue to listen
producer = Producer()
q = producer.q
while True:
rta = await tornado.ioloop.IOLoop.current().run_in_executor(None, self.loop_on_q, q)
if rta != None:
await self.write_message(str(rta))
else:
break
def on_close(self):
print('connection closed. Request: ' + str(self.request) +
'. close_reason: ' + str(self.close_reason) +
'. close_code: ' + str(self.close_code) +
'. get_status: ' + str(self.get_status()))
def loop_on_q(self, q):
rta = q.get()
return rta
class Producer:
def __init__(self):
self.q = queue.Queue()
t = threading.Thread(target=self.start)
t.daemon = True
t.start()
def start(self):
count = 1
while True:
# time.sleep(randint(1,5))
if count < 100:
self.q.put(count)
else:
self.q.put(None)
break
time.sleep(50)
count += 1
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
asyncio.set_event_loop(asyncio.new_event_loop())
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print('SRV START')
tornado.ioloop.IOLoop.instance().instance().start()
#########################
## CLIENT CODE ##
#########################
# If you run it more than 20 times in less than 50 seconds ==> Block
# (number of processors x 5), I have 4 cores
from websocket import create_connection
def conect():
url = 'ws://localhost:8888/ws'
ws = create_connection(url)
print('Conecting')
return ws
print('Conecting to srv')
con_ws = conect()
print('Established connection. Sending msg ...')
msj = '{"type":"Socket"}'
con_ws.send(msj)
print('Package sent. Waiting answer...')
while True:
result = con_ws.recv()
print('Answer: ' + str(result))
增加工人的最大数量是否安全是的,最多可以通过负载测试计算出一定的固定数量。
或者推荐另一种解决方案?如果达到工作人员限制,您可以将工作人员移动到多个独立的服务器(这种方法称为水平缩放)并通过消息将作业传递给他们队列。如果您喜欢自己编写所有内容,请将 Celery 视为包含电池的解决方案或 RabbitMQ、Kafka 等。
我有以下情况:使用 python 3.6 和 Tornado 5.1 通过网络套接字接收客户端请求。其中一些请求需要您调用外部处理,该处理 returns 一个队列,然后定期将结果存入其中。这些结果通过 websocket 传输到客户端。
外部处理不是协同程序,因此我使用 run_in_executor 调用它。
我的问题: 当外部处理的响应时间很大时,run_in_executor达到worker最大数量(default: number of processors x 5)!
增加工人的最大数量安全吗? 或者推荐其他解决方案? !!
下面是简化代码。
从已经非常感谢你了!!!!
#########################
## SERVER CODE ##
#########################
from random import randint
import tornado.httpserver
import tornado.websocket
import tornado.ioloop
import tornado.web
from random import randint
from tornado import gen
import threading
import asyncio
import queue
import time
class WSHandler(tornado.websocket.WebSocketHandler):
"""entry point for all WS request"""
def open(self):
print('new connection. Request: ' + str(self.request))
async def on_message(self, message):
# Emulates the subscription to an external object
# that returns a queue to listen
producer = Producer()
q = producer.q
while True:
rta = await tornado.ioloop.IOLoop.current().run_in_executor(None, self.loop_on_q, q)
if rta != None:
await self.write_message(str(rta))
else:
break
def on_close(self):
print('connection closed. Request: ' + str(self.request) +
'. close_reason: ' + str(self.close_reason) +
'. close_code: ' + str(self.close_code) +
'. get_status: ' + str(self.get_status()))
def loop_on_q(self, q):
rta = q.get()
return rta
class Producer:
def __init__(self):
self.q = queue.Queue()
t = threading.Thread(target=self.start)
t.daemon = True
t.start()
def start(self):
count = 1
while True:
# time.sleep(randint(1,5))
if count < 100:
self.q.put(count)
else:
self.q.put(None)
break
time.sleep(50)
count += 1
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
asyncio.set_event_loop(asyncio.new_event_loop())
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print('SRV START')
tornado.ioloop.IOLoop.instance().instance().start()
#########################
## CLIENT CODE ##
#########################
# If you run it more than 20 times in less than 50 seconds ==> Block
# (number of processors x 5), I have 4 cores
from websocket import create_connection
def conect():
url = 'ws://localhost:8888/ws'
ws = create_connection(url)
print('Conecting')
return ws
print('Conecting to srv')
con_ws = conect()
print('Established connection. Sending msg ...')
msj = '{"type":"Socket"}'
con_ws.send(msj)
print('Package sent. Waiting answer...')
while True:
result = con_ws.recv()
print('Answer: ' + str(result))
增加工人的最大数量是否安全是的,最多可以通过负载测试计算出一定的固定数量。
或者推荐另一种解决方案?如果达到工作人员限制,您可以将工作人员移动到多个独立的服务器(这种方法称为水平缩放)并通过消息将作业传递给他们队列。如果您喜欢自己编写所有内容,请将 Celery 视为包含电池的解决方案或 RabbitMQ、Kafka 等。