高速公路 Python - 未在服务器上调用 onMessage 事件
Autobahn Python - onMessage event not called on server
我对 python 和 websockets 很陌生,我正在使用 Twisted 和 autobahn 为学校项目创建一个 WebSocket 服务器。我正在使用来自 this Python chat example.
的 html/javascript 客户端测试此服务器
服务器本身通过 key(sessionID):client 字典跟踪连接的客户端。当调用 OnMessage() 时,我读取该值,创建一个 json,然后我使用 Kafka 发送它以进行其他处理。
服务器在我的本地主机上运行良好,使用我们学校提供的 Ubuntu 服务器上的 Kafka。但是,在我尝试在 Ubuntu 服务器上部署此 WebSocket 服务器后,我无法让 OnMessage() 事件发生。
我得到了 OnOpen()、OnClose() 事件,但是 OnMessage(以及 OnMessageBegin)事件从未被调用,无论我用 client/server 做什么。
我尝试通过 iptables 和 ufw 关闭防火墙,但没有成功。由于此服务器的限制,仅开放 8000 到 9000 范围内的端口。 autobahn/twisted 使用 websockets 时,这可能是一个问题吗?
这是我用于 WebSocket 服务器协议的代码:
class MiddleServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
logger.info("Client connecting: {0}".format(request.peer))
def onOpen(self):
identifier = uuid.uuid4()
self.factory.register(self, identifier)
logger.info("WebSocket connection open, Session ID: " + identifier)
def onMessageBegin(self, isBinary):
super(MiddleServerProtocol, self).onMessageBegin(isBinary)
def onMessage(self, payload, is_binary):
key = str(list(clients.keys())[list(clients.values()).index(self)])
if is_binary:
logger.error("Invalid message recieved: {0} bytes".
format(len(payload)) + " , Session ID: " + key)
else:
# SEND MESSAGE TO KAFKA
message = prepare_message(payload, key).encode("utf-8")
self.factory.producer_kafka.send(configLoad.
get_value('TOPIC_SEND'),
value=json.loads(message),
key=key.encode('utf-8'))
self.factory.producer_kafka.flush()
logger.info("Json message received: " + message.
decode('cp1250') + " ,Session ID: " + key)
def onClose(self, was_clean, code, reason):
key = str(list(clients.keys())[list(clients.values()).index(self)])
logger.info("WebSocket connection closed: {0}".
format(reason) + " , Session ID: " + key)
这是我用于 Websocket Server Factory 的代码:
class MiddleServerFactory(WebSocketServerFactory):
def __init__(self, *args, **kwargs):
super(MiddleServerFactory, self).__init__(*args, **kwargs)
self.producer_kafka = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=configLoad.
get_value('BOOTSTRAPSERVER'))
logger.info(" Server successfully started.")
def register(self, client, identifier):
"""
Add client to list of managed connections.
We use generated ID as a unique identifier.
"""
clients[identifier] = client
def unregister(self, identifier):
"""
Remove client from list of managed connections.
"""
del clients[identifier]
我是这样 运行 的:
factory = MiddleServerFactory(configLoad.get_value('URL'))
factory.protocol = MiddleServerProtocol
reactor.listenTCP(int(configLoad.get_value('PORT')), factory)
reactor.run()
感谢您提前提供的所有帮助和建议。
对于仍然想知道的人:
这个问题是我没有在服务器和客户端之间使用 HTTPS (WSS) 通信,因为服务器本身在代理后面,这弄乱了 HTTP headers。为了解决这个问题,我不得不切换到 HTTPS (WSS) 通信,这显然使 HTTP headers 加密并且代理不会与它们混淆。
我对 python 和 websockets 很陌生,我正在使用 Twisted 和 autobahn 为学校项目创建一个 WebSocket 服务器。我正在使用来自 this Python chat example.
的 html/javascript 客户端测试此服务器服务器本身通过 key(sessionID):client 字典跟踪连接的客户端。当调用 OnMessage() 时,我读取该值,创建一个 json,然后我使用 Kafka 发送它以进行其他处理。
服务器在我的本地主机上运行良好,使用我们学校提供的 Ubuntu 服务器上的 Kafka。但是,在我尝试在 Ubuntu 服务器上部署此 WebSocket 服务器后,我无法让 OnMessage() 事件发生。
我得到了 OnOpen()、OnClose() 事件,但是 OnMessage(以及 OnMessageBegin)事件从未被调用,无论我用 client/server 做什么。
我尝试通过 iptables 和 ufw 关闭防火墙,但没有成功。由于此服务器的限制,仅开放 8000 到 9000 范围内的端口。 autobahn/twisted 使用 websockets 时,这可能是一个问题吗?
这是我用于 WebSocket 服务器协议的代码:
class MiddleServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
logger.info("Client connecting: {0}".format(request.peer))
def onOpen(self):
identifier = uuid.uuid4()
self.factory.register(self, identifier)
logger.info("WebSocket connection open, Session ID: " + identifier)
def onMessageBegin(self, isBinary):
super(MiddleServerProtocol, self).onMessageBegin(isBinary)
def onMessage(self, payload, is_binary):
key = str(list(clients.keys())[list(clients.values()).index(self)])
if is_binary:
logger.error("Invalid message recieved: {0} bytes".
format(len(payload)) + " , Session ID: " + key)
else:
# SEND MESSAGE TO KAFKA
message = prepare_message(payload, key).encode("utf-8")
self.factory.producer_kafka.send(configLoad.
get_value('TOPIC_SEND'),
value=json.loads(message),
key=key.encode('utf-8'))
self.factory.producer_kafka.flush()
logger.info("Json message received: " + message.
decode('cp1250') + " ,Session ID: " + key)
def onClose(self, was_clean, code, reason):
key = str(list(clients.keys())[list(clients.values()).index(self)])
logger.info("WebSocket connection closed: {0}".
format(reason) + " , Session ID: " + key)
这是我用于 Websocket Server Factory 的代码:
class MiddleServerFactory(WebSocketServerFactory):
def __init__(self, *args, **kwargs):
super(MiddleServerFactory, self).__init__(*args, **kwargs)
self.producer_kafka = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=configLoad.
get_value('BOOTSTRAPSERVER'))
logger.info(" Server successfully started.")
def register(self, client, identifier):
"""
Add client to list of managed connections.
We use generated ID as a unique identifier.
"""
clients[identifier] = client
def unregister(self, identifier):
"""
Remove client from list of managed connections.
"""
del clients[identifier]
我是这样 运行 的:
factory = MiddleServerFactory(configLoad.get_value('URL'))
factory.protocol = MiddleServerProtocol
reactor.listenTCP(int(configLoad.get_value('PORT')), factory)
reactor.run()
感谢您提前提供的所有帮助和建议。
对于仍然想知道的人:
这个问题是我没有在服务器和客户端之间使用 HTTPS (WSS) 通信,因为服务器本身在代理后面,这弄乱了 HTTP headers。为了解决这个问题,我不得不切换到 HTTPS (WSS) 通信,这显然使 HTTP headers 加密并且代理不会与它们混淆。