使用较大的 callback_time(即:200 毫秒)时,Tornado PeriodicCallback 不起作用
Tornado PeriodicCallback does not works when using larger callback_time (ie: 200ms)
我有一个 websocket 服务器应用程序,它使用 Tornado 向每个 websocket 客户端发送消息 PeriodicCallback
。
ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()
dispatch()
函数有一个循环来使用 RabbitMQ 消息,然后将它们转发给每个 websocket 客户端。
def dispatch():
global channel, queue
time_start = time.time()
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
else:
payload = ''
for client in clients:
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
finally:
time_end = time.time()
if time_end - time_start > 1:
break;
return
不知何故,当我使用更大的 callback_time 值(如 100 毫秒或 200 毫秒)时,并非所有消息都转发到 websocket 客户端。但是,当我使用较小的值(如 10 毫秒或 1 毫秒)时,该功能有效。
PeriodicCallback
是如何运作的?如何确保 Tornado 总是调用 dispatch()
函数?
谢谢
我找到了解决办法。我将 PeriodicCallback
替换为 add_callback
:
app.listen(9001)
mainloop = ioloop.IOLoop.current()
mainloop.add_callback(dispatch)
mainloop.start()
然后在 dispatch()
函数的末尾使用 add_callback
,这样 dispatch()
函数将在下一次 I/O 迭代中被调用。
def dispatch():
global channel, queue
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
logger.info('Payload: %s' % payload)
else:
payload = ''
for client in clients:
logger.info('Path: %s' % client.path)
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
else:
break;
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
ioloop.IOLoop.current().add_callback(dispatch)
return
我有一个 websocket 服务器应用程序,它使用 Tornado 向每个 websocket 客户端发送消息 PeriodicCallback
。
ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()
dispatch()
函数有一个循环来使用 RabbitMQ 消息,然后将它们转发给每个 websocket 客户端。
def dispatch():
global channel, queue
time_start = time.time()
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
else:
payload = ''
for client in clients:
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
finally:
time_end = time.time()
if time_end - time_start > 1:
break;
return
不知何故,当我使用更大的 callback_time 值(如 100 毫秒或 200 毫秒)时,并非所有消息都转发到 websocket 客户端。但是,当我使用较小的值(如 10 毫秒或 1 毫秒)时,该功能有效。
PeriodicCallback
是如何运作的?如何确保 Tornado 总是调用 dispatch()
函数?
谢谢
我找到了解决办法。我将 PeriodicCallback
替换为 add_callback
:
app.listen(9001)
mainloop = ioloop.IOLoop.current()
mainloop.add_callback(dispatch)
mainloop.start()
然后在 dispatch()
函数的末尾使用 add_callback
,这样 dispatch()
函数将在下一次 I/O 迭代中被调用。
def dispatch():
global channel, queue
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
logger.info('Payload: %s' % payload)
else:
payload = ''
for client in clients:
logger.info('Path: %s' % client.path)
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
else:
break;
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
ioloop.IOLoop.current().add_callback(dispatch)
return