如何正确组合 PySide2 和 pytransitions 以实现 GUI 应用程序的状态机
How to properly combine PySide2 and pytransitions for implementing a state machine for GUI application
背景:我想使用 PySide2 实现一个 GUI 来控制一堆客户端(通过 RPC 调用与 'servers' 控制硬件,如电机、相机等)。
以前的方法:通常,我要做的是创建我的 GUI 并将 UI 信号连接到客户端插槽,反之亦然。这对于更简单的应用程序非常有效。
问题:我希望我的 GUI 能够正确地表示对客户端的允许调用。最简单的例子:在执行 client1.doXY()
之后,我想禁用执行该命令的按钮,只有在 doZY()
完成后才重新激活它。虽然上述方法完全有可能,但当事情变得更复杂时,感觉就不对了:例如当 GUI 个元素依赖于多个客户端的状态时。
方法:因此,我认为使用有限状态机作为客户端和 GUI 之间的中间层是个好主意,并遇到了 pytransitions,这看起来很有前途.
但是,我正在努力寻找将这两个世界结合起来的正确方法。
问题:
一般来说,这是一种有效的设计方法吗?
特别是如工作代码示例所示,我必须将客户端移动到单独的线程以避免在客户端执行阻塞调用时 GUI 冻结。虽然我的代码工作正常,但它需要一些开销来创建额外的 qt 信号来连接 ClientState
和 Client
对象。
这可以更优雅地完成吗(即没有额外的 xy_requested 信号,但不知何故从 ClientState
直接调用 Client
函数仍然调用 Client
中的函数Client
线程而不是主线程 ?
工作示例:
代码:
import io
import logging
from time import sleep
import numpy as np
from PySide2 import QtSvg, QtWidgets
from PySide2.QtCore import Signal, Slot, QObject, QThread
from PySide2.QtWidgets import QWidget, QPushButton, QApplication
from transitions.extensions import GraphMachine
logging.basicConfig(level=logging.DEBUG)
class Client(QObject):
# Client signals
sig_move_done = Signal()
sig_disconnected = Signal()
sig_connected = Signal()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@Slot(int)
def client_move(self, dest):
print(f'Client moving to {dest}...')
sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
# define states, transitions and extra args for transitions state machine:
states = ['ready', 'moving', 'unknown']
transitions = [
{'trigger': 'move', 'source': 'ready', 'dest': 'moving'},
{'trigger': 'stopped', 'source': 'moving', 'dest': 'ready'},
{'trigger': 'disconnect_', 'source': ['ready', 'moving'], 'dest': 'unknown'},
{'trigger': 'error', 'source': ['ready', 'moving'], 'dest': 'unknown'},
{'trigger': 'connect_', 'source': 'unknown', 'dest': 'ready'}
]
extra_args = dict(initial='unknown', title='Simple state machine',
show_conditions=True, show_state_attributes=True)
class ClientState(QObject):
# machine signals
sig_update_available = Signal()
sig_move_requested = Signal(int) # can this be avoided ? see self.on_enter_moving
sig_connect_requested = Signal() # can this be avoided ?
def __init__(self, client, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = client
# move client to seperate thread
self.worker_thread = QThread()
self.client.moveToThread(self.worker_thread)
self.worker_thread.start()
self.machine = GraphMachine(model=self, states=states, transitions=transitions,
show_auto_transitions=False, **extra_args, after_state_change="update_available",
send_event=True)
# connecting Client signals to state machine triggers
self.client.sig_disconnected.connect(self.disconnect_)
self.client.sig_connected.connect(self.connect_)
self.client.sig_move_done.connect(self.stopped)
self.update_available = lambda *args, **kwargs: self.sig_update_available.emit()
# can this be avoided ? see self.on_enter_moving
self.sig_move_requested.connect(self.client.client_move)
self.sig_connect_requested.connect(self.client.client_connect)
def on_enter_moving(self, event):
print(event.kwargs)
dest = event.kwargs.get('dest', 0)
# calling self.client_move() directly will cause self.client_move to be called from main thread...
# calling it via a helper signal instead:
self.sig_move_requested.emit(dest)
def show_graph(self, **kwargs):
stream = io.BytesIO()
self.get_graph(**kwargs).draw(stream, prog='dot', format='svg')
return stream.getvalue()
class GUI(QWidget):
def __init__(self, client_state):
super().__init__()
self.client_state = client_state
# setup UI
self.setWindowTitle("State")
self.svgWidget = QtSvg.QSvgWidget()
self.layout = QtWidgets.QVBoxLayout()
self.layout.addWidget(self.svgWidget)
self.btn_move = QPushButton("move")
self.btn_connect = QPushButton("(re-)connect")
self.layout.addWidget(self.btn_move)
self.layout.addWidget(self.btn_connect)
self.setLayout(self.layout)
# Connect Slots/Signals
## machine -> GUI
self.client_state.sig_update_available.connect(self.update_gui)
## GUI --> machine
self.btn_move.clicked.connect(lambda: self.client_state.move(dest=np.random.randint(1, 100)))
self.btn_connect.clicked.connect(
self.client_state.connect_)
# update UI
self.update_gui()
def update_gui(self):
print("Update model graph and GUI...")
self.svgWidget.load(self.client_state.show_graph())
if self.client_state.is_ready():
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
if self.client_state.is_moving():
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
if self.client_state.is_unknown():
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
client = Client()
client_state = ClientState(client)
gui = GUI(client_state)
gui.show()
sys.exit(app.exec_())
通常来说,这是拥有这样一个层的有效设计方法吗?
是的,它是有效的,在复杂的应用程序中,FSM 的实现是因为它们简化了逻辑。
关于恕我直言的简化,我更愿意验证在这种情况下 Qt 中是否存在类似的工具,因为它们通过事件或信号与 Qt 的元素友好地交互。在这种情况下,至少有 2 个选项:
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets
import numpy as np
class Client(QtCore.QObject):
# Client signals
sig_move_done = QtCore.Signal()
sig_disconnected = QtCore.Signal()
sig_connected = QtCore.Signal()
@QtCore.Slot(int)
def client_move(self, dest):
print(f"Client moving to {dest}...")
time.sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@QtCore.Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@QtCore.Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
class GUI(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("State")
self.btn_move = QtWidgets.QPushButton("move")
self.btn_connect = QtWidgets.QPushButton("(re-)connect")
self.client = Client()
self._thread = QtCore.QThread(self)
self._thread.start()
self.client.moveToThread(self._thread)
lay = QtWidgets.QVBoxLayout(self)
lay.addWidget(self.btn_move)
lay.addWidget(self.btn_connect)
self.resize(320, 120)
# states
self.unknown_state = QtCore.QState()
self.ready_state = QtCore.QState()
self.moving_state = QtCore.QState()
# transitions
self.ready_state.addTransition(self.btn_move.clicked, self.moving_state)
self.moving_state.addTransition(self.client.sig_move_done, self.ready_state)
self.ready_state.addTransition(self.client.sig_disconnected, self.unknown_state)
self.moving_state.addTransition(self.client.sig_disconnected, self.unknown_state)
self.unknown_state.addTransition(self.btn_connect.clicked, self.ready_state)
self.unknown_state.addTransition(self.client.sig_connected, self.ready_state)
self.unknown_state.entered.connect(self.on_unknown_state_enter)
self.ready_state.entered.connect(self.on_ready_state_enter)
self.moving_state.entered.connect(self.on_moving_state_enter)
state_machine = QtCore.QStateMachine(self)
state_machine.addState(self.ready_state)
state_machine.addState(self.moving_state)
state_machine.addState(self.unknown_state)
state_machine.setInitialState(self.unknown_state)
state_machine.start()
def on_unknown_state_enter(self):
print("unknown_state")
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
def on_ready_state_enter(self):
print("ready_state")
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
def on_moving_state_enter(self):
print("moving_state")
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
dest = np.random.randint(1, 100)
wrapper = partial(self.client.client_move, dest)
QtCore.QTimer.singleShot(0, wrapper)
def closeEvent(self, event):
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
w = GUI()
w.show()
sys.exit(app.exec_())
Simple_State_Machine.scxml
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" version="1.0" binding="early" xmlns:qt="http://www.qt.io/2015/02/scxml-ext" name="Simple_State_Machine" qt:editorversion="4.10.0" initial="unknown">
<qt:editorinfo initialGeometry="150.82;359.88;-20;-20;40;40"/>
<state id="ready">
<qt:editorinfo stateColor="#ff974f" geometry="425.83;190.46;-60;-50;120;100" scenegeometry="425.83;190.46;365.83;140.46;120;100"/>
<transition type="internal" event="move" target="moving">
<qt:editorinfo endTargetFactors="35.02;9.52" movePoint="-34.84;14.59" startTargetFactors="32.33;90.16"/>
</transition>
<transition type="internal" event="disconnect" target="unknown">
<qt:editorinfo endTargetFactors="91.87;60.92" movePoint="9.38;9.36" startTargetFactors="6.25;63.37"/>
</transition>
</state>
<state id="unknown">
<qt:editorinfo stateColor="#89725b" geometry="150.82;190.46;-60;-50;120;100" scenegeometry="150.82;190.46;90.82;140.46;120;100"/>
<transition type="internal" target="ready" event="connect">
<qt:editorinfo endTargetFactors="6.34;41.14" movePoint="0;7.30" startTargetFactors="91.13;39.41"/>
</transition>
</state>
<state id="moving">
<qt:editorinfo stateColor="#a508d0" geometry="425.83;344.53;-60;-50;120;100" scenegeometry="425.83;344.53;365.83;294.53;120;100"/>
<transition type="internal" event="disconnect" target="unknown">
<qt:editorinfo movePoint="2.08;17.72"/>
</transition>
<transition type="internal" event="stopped" target="ready">
<qt:editorinfo endTargetFactors="68.30;90.08" movePoint="62.50;10.32" startTargetFactors="68.69;5.74"/>
</transition>
</state>
</scxml>
import os
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets, QtScxml
import numpy as np
class Client(QtCore.QObject):
# Client signals
sig_move_done = QtCore.Signal()
sig_disconnected = QtCore.Signal()
sig_connected = QtCore.Signal()
@QtCore.Slot(int)
def client_move(self, dest):
print(f"Client moving to {dest}...")
time.sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@QtCore.Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@QtCore.Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
class GUI(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("State")
self.btn_move = QtWidgets.QPushButton("move")
self.btn_connect = QtWidgets.QPushButton("(re-)connect")
self.client = Client()
self._thread = QtCore.QThread(self)
self._thread.start()
self.client.moveToThread(self._thread)
lay = QtWidgets.QVBoxLayout(self)
lay.addWidget(self.btn_move)
lay.addWidget(self.btn_connect)
self.resize(320, 120)
current_dir = os.path.dirname(os.path.realpath(__file__))
filename = os.path.join(current_dir, "Simple_State_Machine.scxml")
machine = QtScxml.QScxmlStateMachine.fromFile(filename)
machine.setParent(self)
for error in machine.parseErrors():
print(error.toString())
machine.connectToState("unknown", self, QtCore.SLOT("on_unknown_state_enter(bool)"))
machine.connectToState("ready", self, QtCore.SLOT("on_ready_state_enter(bool)"))
machine.connectToState("moving", self, QtCore.SLOT("on_moving_state_enter(bool)"))
self.btn_connect.clicked.connect(partial(machine.submitEvent, "connect"))
self.btn_move.clicked.connect(partial(machine.submitEvent, "move"))
self.client.sig_disconnected.connect(partial(machine.submitEvent, "disconnect"))
self.client.sig_connected.connect(partial(machine.submitEvent, "connect"))
self.client.sig_move_done.connect(partial(machine.submitEvent, "stopped"))
machine.start()
@QtCore.Slot(bool)
def on_unknown_state_enter(self, active):
if active:
print("unknown_state")
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
@QtCore.Slot(bool)
def on_ready_state_enter(self, active):
if active:
print("ready_state")
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
@QtCore.Slot(bool)
def on_moving_state_enter(self, active):
if active:
print("moving_state")
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
dest = np.random.randint(1, 100)
wrapper = partial(self.client.client_move, dest)
QtCore.QTimer.singleShot(0, wrapper)
def closeEvent(self, event):
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
w = GUI()
w.show()
sys.exit(app.exec_())
背景:我想使用 PySide2 实现一个 GUI 来控制一堆客户端(通过 RPC 调用与 'servers' 控制硬件,如电机、相机等)。
以前的方法:通常,我要做的是创建我的 GUI 并将 UI 信号连接到客户端插槽,反之亦然。这对于更简单的应用程序非常有效。
问题:我希望我的 GUI 能够正确地表示对客户端的允许调用。最简单的例子:在执行 client1.doXY()
之后,我想禁用执行该命令的按钮,只有在 doZY()
完成后才重新激活它。虽然上述方法完全有可能,但当事情变得更复杂时,感觉就不对了:例如当 GUI 个元素依赖于多个客户端的状态时。
方法:因此,我认为使用有限状态机作为客户端和 GUI 之间的中间层是个好主意,并遇到了 pytransitions,这看起来很有前途. 但是,我正在努力寻找将这两个世界结合起来的正确方法。
问题:
一般来说,这是一种有效的设计方法吗?
特别是如工作代码示例所示,我必须将客户端移动到单独的线程以避免在客户端执行阻塞调用时 GUI 冻结。虽然我的代码工作正常,但它需要一些开销来创建额外的 qt 信号来连接
ClientState
和Client
对象。 这可以更优雅地完成吗(即没有额外的 xy_requested 信号,但不知何故从ClientState
直接调用Client
函数仍然调用Client
中的函数Client
线程而不是主线程 ?
工作示例:
代码:
import io
import logging
from time import sleep
import numpy as np
from PySide2 import QtSvg, QtWidgets
from PySide2.QtCore import Signal, Slot, QObject, QThread
from PySide2.QtWidgets import QWidget, QPushButton, QApplication
from transitions.extensions import GraphMachine
logging.basicConfig(level=logging.DEBUG)
class Client(QObject):
# Client signals
sig_move_done = Signal()
sig_disconnected = Signal()
sig_connected = Signal()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@Slot(int)
def client_move(self, dest):
print(f'Client moving to {dest}...')
sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
# define states, transitions and extra args for transitions state machine:
states = ['ready', 'moving', 'unknown']
transitions = [
{'trigger': 'move', 'source': 'ready', 'dest': 'moving'},
{'trigger': 'stopped', 'source': 'moving', 'dest': 'ready'},
{'trigger': 'disconnect_', 'source': ['ready', 'moving'], 'dest': 'unknown'},
{'trigger': 'error', 'source': ['ready', 'moving'], 'dest': 'unknown'},
{'trigger': 'connect_', 'source': 'unknown', 'dest': 'ready'}
]
extra_args = dict(initial='unknown', title='Simple state machine',
show_conditions=True, show_state_attributes=True)
class ClientState(QObject):
# machine signals
sig_update_available = Signal()
sig_move_requested = Signal(int) # can this be avoided ? see self.on_enter_moving
sig_connect_requested = Signal() # can this be avoided ?
def __init__(self, client, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = client
# move client to seperate thread
self.worker_thread = QThread()
self.client.moveToThread(self.worker_thread)
self.worker_thread.start()
self.machine = GraphMachine(model=self, states=states, transitions=transitions,
show_auto_transitions=False, **extra_args, after_state_change="update_available",
send_event=True)
# connecting Client signals to state machine triggers
self.client.sig_disconnected.connect(self.disconnect_)
self.client.sig_connected.connect(self.connect_)
self.client.sig_move_done.connect(self.stopped)
self.update_available = lambda *args, **kwargs: self.sig_update_available.emit()
# can this be avoided ? see self.on_enter_moving
self.sig_move_requested.connect(self.client.client_move)
self.sig_connect_requested.connect(self.client.client_connect)
def on_enter_moving(self, event):
print(event.kwargs)
dest = event.kwargs.get('dest', 0)
# calling self.client_move() directly will cause self.client_move to be called from main thread...
# calling it via a helper signal instead:
self.sig_move_requested.emit(dest)
def show_graph(self, **kwargs):
stream = io.BytesIO()
self.get_graph(**kwargs).draw(stream, prog='dot', format='svg')
return stream.getvalue()
class GUI(QWidget):
def __init__(self, client_state):
super().__init__()
self.client_state = client_state
# setup UI
self.setWindowTitle("State")
self.svgWidget = QtSvg.QSvgWidget()
self.layout = QtWidgets.QVBoxLayout()
self.layout.addWidget(self.svgWidget)
self.btn_move = QPushButton("move")
self.btn_connect = QPushButton("(re-)connect")
self.layout.addWidget(self.btn_move)
self.layout.addWidget(self.btn_connect)
self.setLayout(self.layout)
# Connect Slots/Signals
## machine -> GUI
self.client_state.sig_update_available.connect(self.update_gui)
## GUI --> machine
self.btn_move.clicked.connect(lambda: self.client_state.move(dest=np.random.randint(1, 100)))
self.btn_connect.clicked.connect(
self.client_state.connect_)
# update UI
self.update_gui()
def update_gui(self):
print("Update model graph and GUI...")
self.svgWidget.load(self.client_state.show_graph())
if self.client_state.is_ready():
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
if self.client_state.is_moving():
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
if self.client_state.is_unknown():
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
client = Client()
client_state = ClientState(client)
gui = GUI(client_state)
gui.show()
sys.exit(app.exec_())
通常来说,这是拥有这样一个层的有效设计方法吗?
是的,它是有效的,在复杂的应用程序中,FSM 的实现是因为它们简化了逻辑。
关于恕我直言的简化,我更愿意验证在这种情况下 Qt 中是否存在类似的工具,因为它们通过事件或信号与 Qt 的元素友好地交互。在这种情况下,至少有 2 个选项:
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets
import numpy as np
class Client(QtCore.QObject):
# Client signals
sig_move_done = QtCore.Signal()
sig_disconnected = QtCore.Signal()
sig_connected = QtCore.Signal()
@QtCore.Slot(int)
def client_move(self, dest):
print(f"Client moving to {dest}...")
time.sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@QtCore.Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@QtCore.Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
class GUI(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("State")
self.btn_move = QtWidgets.QPushButton("move")
self.btn_connect = QtWidgets.QPushButton("(re-)connect")
self.client = Client()
self._thread = QtCore.QThread(self)
self._thread.start()
self.client.moveToThread(self._thread)
lay = QtWidgets.QVBoxLayout(self)
lay.addWidget(self.btn_move)
lay.addWidget(self.btn_connect)
self.resize(320, 120)
# states
self.unknown_state = QtCore.QState()
self.ready_state = QtCore.QState()
self.moving_state = QtCore.QState()
# transitions
self.ready_state.addTransition(self.btn_move.clicked, self.moving_state)
self.moving_state.addTransition(self.client.sig_move_done, self.ready_state)
self.ready_state.addTransition(self.client.sig_disconnected, self.unknown_state)
self.moving_state.addTransition(self.client.sig_disconnected, self.unknown_state)
self.unknown_state.addTransition(self.btn_connect.clicked, self.ready_state)
self.unknown_state.addTransition(self.client.sig_connected, self.ready_state)
self.unknown_state.entered.connect(self.on_unknown_state_enter)
self.ready_state.entered.connect(self.on_ready_state_enter)
self.moving_state.entered.connect(self.on_moving_state_enter)
state_machine = QtCore.QStateMachine(self)
state_machine.addState(self.ready_state)
state_machine.addState(self.moving_state)
state_machine.addState(self.unknown_state)
state_machine.setInitialState(self.unknown_state)
state_machine.start()
def on_unknown_state_enter(self):
print("unknown_state")
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
def on_ready_state_enter(self):
print("ready_state")
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
def on_moving_state_enter(self):
print("moving_state")
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
dest = np.random.randint(1, 100)
wrapper = partial(self.client.client_move, dest)
QtCore.QTimer.singleShot(0, wrapper)
def closeEvent(self, event):
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
w = GUI()
w.show()
sys.exit(app.exec_())
Simple_State_Machine.scxml
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" version="1.0" binding="early" xmlns:qt="http://www.qt.io/2015/02/scxml-ext" name="Simple_State_Machine" qt:editorversion="4.10.0" initial="unknown">
<qt:editorinfo initialGeometry="150.82;359.88;-20;-20;40;40"/>
<state id="ready">
<qt:editorinfo stateColor="#ff974f" geometry="425.83;190.46;-60;-50;120;100" scenegeometry="425.83;190.46;365.83;140.46;120;100"/>
<transition type="internal" event="move" target="moving">
<qt:editorinfo endTargetFactors="35.02;9.52" movePoint="-34.84;14.59" startTargetFactors="32.33;90.16"/>
</transition>
<transition type="internal" event="disconnect" target="unknown">
<qt:editorinfo endTargetFactors="91.87;60.92" movePoint="9.38;9.36" startTargetFactors="6.25;63.37"/>
</transition>
</state>
<state id="unknown">
<qt:editorinfo stateColor="#89725b" geometry="150.82;190.46;-60;-50;120;100" scenegeometry="150.82;190.46;90.82;140.46;120;100"/>
<transition type="internal" target="ready" event="connect">
<qt:editorinfo endTargetFactors="6.34;41.14" movePoint="0;7.30" startTargetFactors="91.13;39.41"/>
</transition>
</state>
<state id="moving">
<qt:editorinfo stateColor="#a508d0" geometry="425.83;344.53;-60;-50;120;100" scenegeometry="425.83;344.53;365.83;294.53;120;100"/>
<transition type="internal" event="disconnect" target="unknown">
<qt:editorinfo movePoint="2.08;17.72"/>
</transition>
<transition type="internal" event="stopped" target="ready">
<qt:editorinfo endTargetFactors="68.30;90.08" movePoint="62.50;10.32" startTargetFactors="68.69;5.74"/>
</transition>
</state>
</scxml>
import os
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets, QtScxml
import numpy as np
class Client(QtCore.QObject):
# Client signals
sig_move_done = QtCore.Signal()
sig_disconnected = QtCore.Signal()
sig_connected = QtCore.Signal()
@QtCore.Slot(int)
def client_move(self, dest):
print(f"Client moving to {dest}...")
time.sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@QtCore.Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@QtCore.Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
class GUI(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("State")
self.btn_move = QtWidgets.QPushButton("move")
self.btn_connect = QtWidgets.QPushButton("(re-)connect")
self.client = Client()
self._thread = QtCore.QThread(self)
self._thread.start()
self.client.moveToThread(self._thread)
lay = QtWidgets.QVBoxLayout(self)
lay.addWidget(self.btn_move)
lay.addWidget(self.btn_connect)
self.resize(320, 120)
current_dir = os.path.dirname(os.path.realpath(__file__))
filename = os.path.join(current_dir, "Simple_State_Machine.scxml")
machine = QtScxml.QScxmlStateMachine.fromFile(filename)
machine.setParent(self)
for error in machine.parseErrors():
print(error.toString())
machine.connectToState("unknown", self, QtCore.SLOT("on_unknown_state_enter(bool)"))
machine.connectToState("ready", self, QtCore.SLOT("on_ready_state_enter(bool)"))
machine.connectToState("moving", self, QtCore.SLOT("on_moving_state_enter(bool)"))
self.btn_connect.clicked.connect(partial(machine.submitEvent, "connect"))
self.btn_move.clicked.connect(partial(machine.submitEvent, "move"))
self.client.sig_disconnected.connect(partial(machine.submitEvent, "disconnect"))
self.client.sig_connected.connect(partial(machine.submitEvent, "connect"))
self.client.sig_move_done.connect(partial(machine.submitEvent, "stopped"))
machine.start()
@QtCore.Slot(bool)
def on_unknown_state_enter(self, active):
if active:
print("unknown_state")
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
@QtCore.Slot(bool)
def on_ready_state_enter(self, active):
if active:
print("ready_state")
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
@QtCore.Slot(bool)
def on_moving_state_enter(self, active):
if active:
print("moving_state")
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
dest = np.random.randint(1, 100)
wrapper = partial(self.client.client_move, dest)
QtCore.QTimer.singleShot(0, wrapper)
def closeEvent(self, event):
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
w = GUI()
w.show()
sys.exit(app.exec_())