如何将数据包发送到对话框?
How to send packet data to dialog?
我用pyqt5制作了一些接收到的tcp包对话框。
这段代码是load dialog,在qthread中调用receive tcp packet。
我想发送数据包到对话框。
如何发送?
这是我的代码。
import sys
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import socketserver
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
try:
data = self.rfile.read(28)
# how to send packet data to dialog?
except Exception as e:
print('MyTCPHandler.handle exception error: ', e)
class TestThread(QThread):
HOST, PORT = '192.168.0.100', 8484
def __init__(self, parent=None):
super().__init__()
def receive_packet(self):
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.TCPServer((self.HOST, self.PORT), MyTCPHandler)
server.serve_forever()
def run(self):
print('run thread')
self.receive_packet()
class TestGUI(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.btn1 = QPushButton("start thread", self)
self.textbox1 = QLineEdit(self)
vertBox = QVBoxLayout()
vertBox.addWidget(self.btn1)
vertBox.addWidget(self.textbox1)
self.setLayout(vertBox)
self.setGeometry(700, 500, 300, 100)
self.btn1.clicked.connect(self.threadStart)
self.show()
self.th = TestThread(self)
@pyqtSlot()
def threadStart(self):
self.th.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
form = TestGUI()
app.exec_()
发送一个线程的信息可以使用QMetaObject.invokeMethod()
,所以需要通过GUI,在这种情况下我们利用QThread的父级是GUI。
您可以设置一个新的 属性 以将 GUI 传递给处理程序,并从处理程序访问 属性 到 self.server。
最后,我们实现了一个接收信息的插槽。
import sys
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import socketserver
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
try:
data = self.rfile.read(28)
QMetaObject.invokeMethod(self.server.w, "setData",
Qt.QueuedConnection, Q_ARG(bytes, data))
except Exception as e:
print('MyTCPHandler.handle exception error: ', e)
class TestThread(QThread):
HOST, PORT = '192.168.0.102', 8000
def __init__(self, parent=None):
super().__init__(parent)
def receive_packet(self):
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.TCPServer((self.HOST, self.PORT), MyTCPHandler)
server.w = self.parent()
server.serve_forever()
def run(self):
print('run thread')
self.receive_packet()
class TestGUI(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.btn1 = QPushButton("start thread", self)
self.textbox1 = QLineEdit(self)
vertBox = QVBoxLayout()
vertBox.addWidget(self.btn1)
vertBox.addWidget(self.textbox1)
self.setLayout(vertBox)
self.setGeometry(700, 500, 300, 100)
self.btn1.clicked.connect(self.threadStart)
self.show()
self.th = TestThread(self)
@pyqtSlot(bytes)
def setData(self, data):
print(data)
@pyqtSlot()
def threadStart(self):
self.th.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
form = TestGUI()
sys.exit(app.exec_())
@Plus: 有信号
import sys
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import socketserver
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
try:
data = self.rfile.read(28)
self.server.qthread.dataChanged.emit(data)
except Exception as e:
print('MyTCPHandler.handle exception error: ', e)
class TestThread(QThread):
HOST, PORT = '192.168.0.102', 8000
dataChanged = pyqtSignal(bytes)
def __init__(self, parent=None):
super().__init__(parent)
def receive_packet(self):
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.TCPServer((self.HOST, self.PORT), MyTCPHandler)
server.qthread = self
server.serve_forever()
def run(self):
print('run thread')
self.receive_packet()
class TestGUI(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.btn1 = QPushButton("start thread", self)
self.textbox1 = QLineEdit(self)
vertBox = QVBoxLayout()
vertBox.addWidget(self.btn1)
vertBox.addWidget(self.textbox1)
self.setLayout(vertBox)
self.setGeometry(700, 500, 300, 100)
self.btn1.clicked.connect(self.threadStart)
self.show()
self.th = TestThread(self)
self.th.dataChanged.connect(self.setData)
@pyqtSlot(bytes)
def setData(self, data):
print(data)
@pyqtSlot()
def threadStart(self):
self.th.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
form = TestGUI()
sys.exit(app.exec_())
我用pyqt5制作了一些接收到的tcp包对话框。
这段代码是load dialog,在qthread中调用receive tcp packet。
我想发送数据包到对话框。
如何发送?
这是我的代码。
import sys
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import socketserver
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
try:
data = self.rfile.read(28)
# how to send packet data to dialog?
except Exception as e:
print('MyTCPHandler.handle exception error: ', e)
class TestThread(QThread):
HOST, PORT = '192.168.0.100', 8484
def __init__(self, parent=None):
super().__init__()
def receive_packet(self):
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.TCPServer((self.HOST, self.PORT), MyTCPHandler)
server.serve_forever()
def run(self):
print('run thread')
self.receive_packet()
class TestGUI(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.btn1 = QPushButton("start thread", self)
self.textbox1 = QLineEdit(self)
vertBox = QVBoxLayout()
vertBox.addWidget(self.btn1)
vertBox.addWidget(self.textbox1)
self.setLayout(vertBox)
self.setGeometry(700, 500, 300, 100)
self.btn1.clicked.connect(self.threadStart)
self.show()
self.th = TestThread(self)
@pyqtSlot()
def threadStart(self):
self.th.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
form = TestGUI()
app.exec_()
发送一个线程的信息可以使用QMetaObject.invokeMethod()
,所以需要通过GUI,在这种情况下我们利用QThread的父级是GUI。
您可以设置一个新的 属性 以将 GUI 传递给处理程序,并从处理程序访问 属性 到 self.server。
最后,我们实现了一个接收信息的插槽。
import sys
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import socketserver
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
try:
data = self.rfile.read(28)
QMetaObject.invokeMethod(self.server.w, "setData",
Qt.QueuedConnection, Q_ARG(bytes, data))
except Exception as e:
print('MyTCPHandler.handle exception error: ', e)
class TestThread(QThread):
HOST, PORT = '192.168.0.102', 8000
def __init__(self, parent=None):
super().__init__(parent)
def receive_packet(self):
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.TCPServer((self.HOST, self.PORT), MyTCPHandler)
server.w = self.parent()
server.serve_forever()
def run(self):
print('run thread')
self.receive_packet()
class TestGUI(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.btn1 = QPushButton("start thread", self)
self.textbox1 = QLineEdit(self)
vertBox = QVBoxLayout()
vertBox.addWidget(self.btn1)
vertBox.addWidget(self.textbox1)
self.setLayout(vertBox)
self.setGeometry(700, 500, 300, 100)
self.btn1.clicked.connect(self.threadStart)
self.show()
self.th = TestThread(self)
@pyqtSlot(bytes)
def setData(self, data):
print(data)
@pyqtSlot()
def threadStart(self):
self.th.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
form = TestGUI()
sys.exit(app.exec_())
@Plus: 有信号
import sys
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import socketserver
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
try:
data = self.rfile.read(28)
self.server.qthread.dataChanged.emit(data)
except Exception as e:
print('MyTCPHandler.handle exception error: ', e)
class TestThread(QThread):
HOST, PORT = '192.168.0.102', 8000
dataChanged = pyqtSignal(bytes)
def __init__(self, parent=None):
super().__init__(parent)
def receive_packet(self):
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.TCPServer((self.HOST, self.PORT), MyTCPHandler)
server.qthread = self
server.serve_forever()
def run(self):
print('run thread')
self.receive_packet()
class TestGUI(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.btn1 = QPushButton("start thread", self)
self.textbox1 = QLineEdit(self)
vertBox = QVBoxLayout()
vertBox.addWidget(self.btn1)
vertBox.addWidget(self.textbox1)
self.setLayout(vertBox)
self.setGeometry(700, 500, 300, 100)
self.btn1.clicked.connect(self.threadStart)
self.show()
self.th = TestThread(self)
self.th.dataChanged.connect(self.setData)
@pyqtSlot(bytes)
def setData(self, data):
print(data)
@pyqtSlot()
def threadStart(self):
self.th.start()
if __name__ == '__main__':
app = QApplication(sys.argv)
form = TestGUI()
sys.exit(app.exec_())