高速公路 Python - 未在服务器上调用 onMessage 事件

Autobahn Python - onMessage event not called on server

我对 python 和 websockets 很陌生,我正在使用 Twisted 和 autobahn 为学校项目创建一个 WebSocket 服务器。我正在使用来自 this Python chat example.

的 html/javascript 客户端测试此服务器

服务器本身通过 key(sessionID):client 字典跟踪连接的客户端。当调用 OnMessage() 时,我读取该值,创建一个 json,然后我使用 Kafka 发送它以进行其他处理。

服务器在我的本地主机上运行良好,使用我们学校提供的 Ubuntu 服务器上的 Kafka。但是,在我尝试在 Ubuntu 服务器上部署此 WebSocket 服务器后,我无法让 OnMessage() 事件发生。

我得到了 OnOpen()、OnClose() 事件,但是 OnMessage(以及 OnMessageBegin)事件从未被调用,无论我用 client/server 做什么。

我尝试通过 iptables 和 ufw 关闭防火墙,但没有成功。由于此服务器的限制,仅开放 8000 到 9000 范围内的端口。 autobahn/twisted 使用 websockets 时,这可能是一个问题吗?

这是我用于 WebSocket 服务器协议的代码:

class MiddleServerProtocol(WebSocketServerProtocol):

def onConnect(self, request):
    logger.info("Client connecting: {0}".format(request.peer))

def onOpen(self):
    identifier = uuid.uuid4()
    self.factory.register(self, identifier)
    logger.info("WebSocket connection open, Session ID: " + identifier)

def onMessageBegin(self, isBinary):
    super(MiddleServerProtocol, self).onMessageBegin(isBinary)


def onMessage(self, payload, is_binary):
    key = str(list(clients.keys())[list(clients.values()).index(self)])
    if is_binary:
        logger.error("Invalid message recieved: {0} bytes".
                     format(len(payload)) + " , Session ID: " + key)
    else:
        # SEND MESSAGE TO KAFKA
        message = prepare_message(payload, key).encode("utf-8")
        self.factory.producer_kafka.send(configLoad.
                                         get_value('TOPIC_SEND'),
                                         value=json.loads(message),
                                         key=key.encode('utf-8'))
        self.factory.producer_kafka.flush()
        logger.info("Json message received: " + message.
                    decode('cp1250') + " ,Session ID: " + key)

def onClose(self, was_clean, code, reason):
    key = str(list(clients.keys())[list(clients.values()).index(self)])
    logger.info("WebSocket connection closed: {0}".
                format(reason) + " , Session ID: " + key)

这是我用于 Websocket Server Factory 的代码:

class MiddleServerFactory(WebSocketServerFactory):

def __init__(self, *args, **kwargs):
    super(MiddleServerFactory, self).__init__(*args, **kwargs)
    self.producer_kafka = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                        bootstrap_servers=configLoad.
                                        get_value('BOOTSTRAPSERVER'))
    logger.info(" Server successfully started.")

def register(self, client, identifier):
    """
   Add client to list of managed connections.
   We use generated ID as a unique identifier.
   """
    clients[identifier] = client

def unregister(self, identifier):
    """
   Remove client from list of managed connections.
   """
    del clients[identifier]

我是这样 运行 的:

factory = MiddleServerFactory(configLoad.get_value('URL'))
factory.protocol = MiddleServerProtocol
reactor.listenTCP(int(configLoad.get_value('PORT')), factory)
reactor.run()

感谢您提前提供的所有帮助和建议。

对于仍然想知道的人:

这个问题是我没有在服务器和客户端之间使用 HTTPS (WSS) 通信,因为服务器本身在代理后面,这弄乱了 HTTP headers。为了解决这个问题,我不得不切换到 HTTPS (WSS) 通信,这显然使 HTTP headers 加密并且代理不会与它们混淆。