使用较大的 callback_time(即:200 毫秒)时,Tornado PeriodicCallback 不起作用

Tornado PeriodicCallback does not works when using larger callback_time (ie: 200ms)

我有一个 websocket 服务器应用程序,它使用 Tornado 向每个 websocket 客户端发送消息 PeriodicCallback

ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()

dispatch() 函数有一个循环来使用 RabbitMQ 消息,然后将它们转发给每个 websocket 客户端。

def dispatch():

    global channel, queue
    time_start = time.time()
    while True:
        try:
            method_frame, header_frame, body = channel.basic_get(queue)
            if method_frame:
                message = json.loads(body.decode('utf-8'))
                if 'websocket_uri' in message:
                    websocket_uri = message['websocket_uri']
                    uri = urlparse(websocket_uri)
                    path = uri.path
                else:
                    path = ''
                if 'payload' in message:
                    payload = json.dumps(message['payload'])
                else:
                    payload = ''
                for client in clients:
                    if client.path == path:
                        client.write_message(payload)
                        logger.info('WRITE: %s: %s' % (client.path, payload))
                channel.basic_ack(method_frame.delivery_tag)
        except Exception as e:
            logger.exception(str(e))
            channel.basic_nack(method_frame.delivery_tag)
        finally:
            time_end = time.time()

        if time_end - time_start > 1:
            break;

    return

不知何故,当我使用更大的 callback_time 值(如 100 毫秒或 200 毫秒)时,并非所有消息都转发到 websocket 客户端。但是,当我使用较小的值(如 10 毫秒或 1 毫秒)时,该功能有效。

PeriodicCallback 是如何运作的?如何确保 Tornado 总是调用 dispatch() 函数?

谢谢

我找到了解决办法。我将 PeriodicCallback 替换为 add_callback:

app.listen(9001)
mainloop = ioloop.IOLoop.current()
mainloop.add_callback(dispatch)
mainloop.start()

然后在 dispatch() 函数的末尾使用 add_callback,这样 dispatch() 函数将在下一次 I/O 迭代中被调用。

def dispatch():

    global channel, queue
    while True:
        try:
            method_frame, header_frame, body = channel.basic_get(queue)
            if method_frame:
                message = json.loads(body.decode('utf-8'))
                if 'websocket_uri' in message:
                    websocket_uri = message['websocket_uri']
                    uri = urlparse(websocket_uri)
                    path = uri.path
                else:
                    path = ''
                if 'payload' in message:
                    payload = json.dumps(message['payload'])
                    logger.info('Payload: %s' % payload)
                else:
                    payload = ''
                for client in clients:
                    logger.info('Path: %s' % client.path)
                    if client.path == path:
                        client.write_message(payload)
                        logger.info('WRITE: %s: %s' % (client.path, payload))
                channel.basic_ack(method_frame.delivery_tag)
            else:
                break;
        except Exception as e:
            logger.exception(str(e))
            channel.basic_nack(method_frame.delivery_tag)

    ioloop.IOLoop.current().add_callback(dispatch)
    return