如何在 PyQt5 中使用 QWebSocket 创建 websocket 客户端
How to create a websocket client by using QWebSocket in PyQt5
我想在 PyQt5.For 方便的情况下使用 QWebSocket 创建一个 websocket 客户端,假设我有一个 websocket 服务器,源代码是这样的,
from PyQt5 import QtCore, QtWebSockets, QtNetwork, QtGui
from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QAction
from PyQt5.QtCore import QUrl
class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress.LocalHost, 1302):
print('Connected: '+self.server.serverName()+' : '
+self.server.serverAddress().toString()+':'+str(self.server.serverPort()))
else:
print('error')
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())
def onNewConnection(self):
self.clientConnection = self.server.nextPendingConnection()
self.clientConnection.textMessageReceived.connect(self.processTextMessage)
self.clientConnection.binaryMessageReceived.connect(self.processBinaryMessage)
self.clientConnection.disconnected.connect(self.socketDisconnected)
print("newClient")
self.clients.append(self.clientConnection)
def processTextMessage(self, message):
print(message)
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)
def processBinaryMessage(self, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)
def socketDisconnected(self):
if self.clientConnection:
self.clients.remove(self.clientConnection)
self.clientConnection.deleteLater()
if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.NonSecureMode)
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
app.exec_()
它可以创建一个 websocket 服务器,我使用 JavaScript 测试它,它工作正常。但是我可以找到一种方法来使用 Qwebsocket.My 创建客户端,客户端代码是这样的:
client = QtWebSockets.QWebSocket("",QtWebSockets.QWebSocketProtocol.Version13,None)
client.open(QUrl("ws://127.0.0.1:1302"))
client.sendTextMessage("asd")
client.close()
服务端好像没有收到客户端发送的消息,请问如何创建一个websocket客户端并使用Qwebsocket发送消息?
这是 qt 控制台程序的典型问题,您需要在 python 构造函数 (__init__
) 之外调用您的客户端方法。
我稍微修改了你的服务器,添加了一些错误测试(没什么新东西):
from PyQt5 import QtCore, QtWebSockets, QtNetwork, QtGui
from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QAction
from PyQt5.QtCore import QUrl
class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
print("server name: {}".format(parent.serverName()))
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress.LocalHost, 1302):
print('Listening: {}:{}:{}'.format(
self.server.serverName(), self.server.serverAddress().toString(),
str(self.server.serverPort())))
else:
print('error')
self.server.acceptError.connect(self.onAcceptError)
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())
def onAcceptError(accept_error):
print("Accept Error: {}".format(accept_error))
def onNewConnection(self):
print("onNewConnection")
self.clientConnection = self.server.nextPendingConnection()
self.clientConnection.textMessageReceived.connect(self.processTextMessage)
self.clientConnection.textFrameReceived.connect(self.processTextFrame)
self.clientConnection.binaryMessageReceived.connect(self.processBinaryMessage)
self.clientConnection.disconnected.connect(self.socketDisconnected)
print("newClient")
self.clients.append(self.clientConnection)
def processTextFrame(self, frame, is_last_frame):
print("in processTextFrame")
print("\tFrame: {} ; is_last_frame: {}".format(frame, is_last_frame))
def processTextMessage(self, message):
print("processTextMessage - message: {}".format(message))
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)
def processBinaryMessage(self, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)
def socketDisconnected(self):
print("socketDisconnected")
if self.clientConnection:
self.clients.remove(self.clientConnection)
self.clientConnection.deleteLater()
if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.NonSecureMode)
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
app.exec_()
客户端使用一些QTimer
来调用__init__
方法之外的所需方法。我还添加了 ping / pong 方法来检查连接:
import sys
from PyQt5 import QtCore, QtWebSockets, QtNetwork
from PyQt5.QtCore import QUrl, QCoreApplication, QTimer
from PyQt5.QtWidgets import QApplication
class Client(QtCore.QObject):
def __init__(self, parent):
super().__init__(parent)
self.client = QtWebSockets.QWebSocket("",QtWebSockets.QWebSocketProtocol.Version13,None)
self.client.error.connect(self.error)
self.client.open(QUrl("ws://127.0.0.1:1302"))
self.client.pong.connect(self.onPong)
def do_ping(self):
print("client: do_ping")
self.client.ping(b"foo")
def send_message(self):
print("client: send_message")
self.client.sendTextMessage("asd")
def onPong(self, elapsedTime, payload):
print("onPong - time: {} ; payload: {}".format(elapsedTime, payload))
def error(self, error_code):
print("error code: {}".format(error_code))
print(self.client.errorString())
def close(self):
self.client.close()
def quit_app():
print("timer timeout - exiting")
QCoreApplication.quit()
def ping():
client.do_ping()
def send_message():
client.send_message()
if __name__ == '__main__':
global client
app = QApplication(sys.argv)
QTimer.singleShot(2000, ping)
QTimer.singleShot(3000, send_message)
QTimer.singleShot(5000, quit_app)
client = Client(app)
app.exec_()
服务器输出:
G:\Qt\QtTests>python so_qwebsocket_server.py
server name: My Socket
Listening: My Socket:127.0.0.1:1302
True
onNewConnection
newClient
in processTextFrame
Frame: asd ; is_last_frame: True
processTextMessage - message: asd
socketDisconnected
客户端输出:
G:\Qt\QtTests>python so_qwebsocket_client.py
client: do_ping
onPong - time: 0 ; payload: b'foo'
client: send_message
timer timeout
总而言之,如果您在简单的 GUI 中使用您的客户端(例如,将 client.sendTextMessage()
移到 __init__
之外并连接一个按钮,单击以实际发送消息),由于其异步性质, 它应该没有任何问题!
我想在 PyQt5.For 方便的情况下使用 QWebSocket 创建一个 websocket 客户端,假设我有一个 websocket 服务器,源代码是这样的,
from PyQt5 import QtCore, QtWebSockets, QtNetwork, QtGui
from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QAction
from PyQt5.QtCore import QUrl
class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress.LocalHost, 1302):
print('Connected: '+self.server.serverName()+' : '
+self.server.serverAddress().toString()+':'+str(self.server.serverPort()))
else:
print('error')
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())
def onNewConnection(self):
self.clientConnection = self.server.nextPendingConnection()
self.clientConnection.textMessageReceived.connect(self.processTextMessage)
self.clientConnection.binaryMessageReceived.connect(self.processBinaryMessage)
self.clientConnection.disconnected.connect(self.socketDisconnected)
print("newClient")
self.clients.append(self.clientConnection)
def processTextMessage(self, message):
print(message)
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)
def processBinaryMessage(self, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)
def socketDisconnected(self):
if self.clientConnection:
self.clients.remove(self.clientConnection)
self.clientConnection.deleteLater()
if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.NonSecureMode)
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
app.exec_()
它可以创建一个 websocket 服务器,我使用 JavaScript 测试它,它工作正常。但是我可以找到一种方法来使用 Qwebsocket.My 创建客户端,客户端代码是这样的:
client = QtWebSockets.QWebSocket("",QtWebSockets.QWebSocketProtocol.Version13,None)
client.open(QUrl("ws://127.0.0.1:1302"))
client.sendTextMessage("asd")
client.close()
服务端好像没有收到客户端发送的消息,请问如何创建一个websocket客户端并使用Qwebsocket发送消息?
这是 qt 控制台程序的典型问题,您需要在 python 构造函数 (__init__
) 之外调用您的客户端方法。
我稍微修改了你的服务器,添加了一些错误测试(没什么新东西):
from PyQt5 import QtCore, QtWebSockets, QtNetwork, QtGui
from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QAction
from PyQt5.QtCore import QUrl
class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
print("server name: {}".format(parent.serverName()))
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress.LocalHost, 1302):
print('Listening: {}:{}:{}'.format(
self.server.serverName(), self.server.serverAddress().toString(),
str(self.server.serverPort())))
else:
print('error')
self.server.acceptError.connect(self.onAcceptError)
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())
def onAcceptError(accept_error):
print("Accept Error: {}".format(accept_error))
def onNewConnection(self):
print("onNewConnection")
self.clientConnection = self.server.nextPendingConnection()
self.clientConnection.textMessageReceived.connect(self.processTextMessage)
self.clientConnection.textFrameReceived.connect(self.processTextFrame)
self.clientConnection.binaryMessageReceived.connect(self.processBinaryMessage)
self.clientConnection.disconnected.connect(self.socketDisconnected)
print("newClient")
self.clients.append(self.clientConnection)
def processTextFrame(self, frame, is_last_frame):
print("in processTextFrame")
print("\tFrame: {} ; is_last_frame: {}".format(frame, is_last_frame))
def processTextMessage(self, message):
print("processTextMessage - message: {}".format(message))
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)
def processBinaryMessage(self, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)
def socketDisconnected(self):
print("socketDisconnected")
if self.clientConnection:
self.clients.remove(self.clientConnection)
self.clientConnection.deleteLater()
if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.NonSecureMode)
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
app.exec_()
客户端使用一些QTimer
来调用__init__
方法之外的所需方法。我还添加了 ping / pong 方法来检查连接:
import sys
from PyQt5 import QtCore, QtWebSockets, QtNetwork
from PyQt5.QtCore import QUrl, QCoreApplication, QTimer
from PyQt5.QtWidgets import QApplication
class Client(QtCore.QObject):
def __init__(self, parent):
super().__init__(parent)
self.client = QtWebSockets.QWebSocket("",QtWebSockets.QWebSocketProtocol.Version13,None)
self.client.error.connect(self.error)
self.client.open(QUrl("ws://127.0.0.1:1302"))
self.client.pong.connect(self.onPong)
def do_ping(self):
print("client: do_ping")
self.client.ping(b"foo")
def send_message(self):
print("client: send_message")
self.client.sendTextMessage("asd")
def onPong(self, elapsedTime, payload):
print("onPong - time: {} ; payload: {}".format(elapsedTime, payload))
def error(self, error_code):
print("error code: {}".format(error_code))
print(self.client.errorString())
def close(self):
self.client.close()
def quit_app():
print("timer timeout - exiting")
QCoreApplication.quit()
def ping():
client.do_ping()
def send_message():
client.send_message()
if __name__ == '__main__':
global client
app = QApplication(sys.argv)
QTimer.singleShot(2000, ping)
QTimer.singleShot(3000, send_message)
QTimer.singleShot(5000, quit_app)
client = Client(app)
app.exec_()
服务器输出:
G:\Qt\QtTests>python so_qwebsocket_server.py
server name: My Socket
Listening: My Socket:127.0.0.1:1302
True
onNewConnection
newClient
in processTextFrame
Frame: asd ; is_last_frame: True
processTextMessage - message: asd
socketDisconnected
客户端输出:
G:\Qt\QtTests>python so_qwebsocket_client.py
client: do_ping
onPong - time: 0 ; payload: b'foo'
client: send_message
timer timeout
总而言之,如果您在简单的 GUI 中使用您的客户端(例如,将 client.sendTextMessage()
移到 __init__
之外并连接一个按钮,单击以实际发送消息),由于其异步性质, 它应该没有任何问题!