如何正确组合 PySide2 和 pytransitions 以实现 GUI 应用程序的状态机

How to properly combine PySide2 and pytransitions for implementing a state machine for GUI application

背景:我想使用 PySide2 实现一个 GUI 来控制一堆客户端(通过 RPC 调用与 'servers' 控制硬件,如电机、相机等)。

以前的方法:通常,我要做的是创建我的 GUI 并将 UI 信号连接到客户端插槽,反之亦然。这对于更简单的应用程序非常有效。

问题:我希望我的 GUI 能够正确地表示对客户端的允许调用。最简单的例子:在执行 client1.doXY() 之后,我想禁用执行该命令的按钮,只有在 doZY() 完成后才重新激活它。虽然上述方法完全有可能,但当事情变得更复杂时,感觉就不对了:例如当 GUI 个元素依赖于多个客户端的状态时。

方法:因此,我认为使用有限状态机作为客户端和 GUI 之间的中间层是个好主意,并遇到了 pytransitions,这看起来很有前途. 但是,我正在努力寻找将这两个世界结合起来的正确方法。

问题:

工作示例:

代码:

import io
import logging
from time import sleep

import numpy as np
from PySide2 import QtSvg, QtWidgets
from PySide2.QtCore import Signal, Slot, QObject, QThread
from PySide2.QtWidgets import QWidget, QPushButton, QApplication
from transitions.extensions import GraphMachine

logging.basicConfig(level=logging.DEBUG)


class Client(QObject):
    # Client signals
    sig_move_done = Signal()
    sig_disconnected = Signal()
    sig_connected = Signal()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    @Slot(int)
    def client_move(self, dest):
        print(f'Client moving to {dest}...')
        sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


# define states, transitions and extra args for transitions state machine:
states = ['ready', 'moving', 'unknown']

transitions = [
    {'trigger': 'move', 'source': 'ready', 'dest': 'moving'},
    {'trigger': 'stopped', 'source': 'moving', 'dest': 'ready'},
    {'trigger': 'disconnect_', 'source': ['ready', 'moving'], 'dest': 'unknown'},
    {'trigger': 'error', 'source': ['ready', 'moving'], 'dest': 'unknown'},
    {'trigger': 'connect_', 'source': 'unknown', 'dest': 'ready'}

]

extra_args = dict(initial='unknown', title='Simple state machine',
                  show_conditions=True, show_state_attributes=True)


class ClientState(QObject):
    # machine signals
    sig_update_available = Signal()
    sig_move_requested = Signal(int)  # can this be avoided ? see self.on_enter_moving
    sig_connect_requested = Signal()  # can this be avoided ? 

    def __init__(self, client, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = client
        # move client to seperate thread
        self.worker_thread = QThread()
        self.client.moveToThread(self.worker_thread)
        self.worker_thread.start()

        self.machine = GraphMachine(model=self, states=states, transitions=transitions,
                                    show_auto_transitions=False, **extra_args, after_state_change="update_available",
                                    send_event=True)

        # connecting Client signals to state machine triggers
        self.client.sig_disconnected.connect(self.disconnect_)
        self.client.sig_connected.connect(self.connect_)
        self.client.sig_move_done.connect(self.stopped)
        self.update_available = lambda *args, **kwargs: self.sig_update_available.emit()

        # can this be avoided ? see self.on_enter_moving
        self.sig_move_requested.connect(self.client.client_move)
        self.sig_connect_requested.connect(self.client.client_connect)

    def on_enter_moving(self, event):
        print(event.kwargs)
        dest = event.kwargs.get('dest', 0)
        # calling self.client_move() directly will cause self.client_move to be called from main thread...
        # calling it via a helper signal instead:
        self.sig_move_requested.emit(dest)

    def show_graph(self, **kwargs):
        stream = io.BytesIO()
        self.get_graph(**kwargs).draw(stream, prog='dot', format='svg')
        return stream.getvalue()


class GUI(QWidget):
    def __init__(self, client_state):
        super().__init__()
        self.client_state = client_state

        # setup UI
        self.setWindowTitle("State")
        self.svgWidget = QtSvg.QSvgWidget()
        self.layout = QtWidgets.QVBoxLayout()
        self.layout.addWidget(self.svgWidget)
        self.btn_move = QPushButton("move")
        self.btn_connect = QPushButton("(re-)connect")
        self.layout.addWidget(self.btn_move)
        self.layout.addWidget(self.btn_connect)

        self.setLayout(self.layout)

        # Connect Slots/Signals
        ## machine -> GUI
        self.client_state.sig_update_available.connect(self.update_gui)

        ## GUI --> machine
        self.btn_move.clicked.connect(lambda: self.client_state.move(dest=np.random.randint(1, 100)))
        self.btn_connect.clicked.connect(
            self.client_state.connect_)

        # update UI
        self.update_gui()

    def update_gui(self):
        print("Update model graph and GUI...")
        self.svgWidget.load(self.client_state.show_graph())

        if self.client_state.is_ready():
            self.btn_move.setEnabled(True)
            self.btn_connect.setDisabled(True)
        if self.client_state.is_moving():
            self.btn_move.setDisabled(True)
            self.btn_connect.setDisabled(True)
        if self.client_state.is_unknown():
            self.btn_move.setDisabled(True)
            self.btn_connect.setEnabled(True)


if __name__ == "__main__":
    import sys

    app = QApplication(sys.argv)
    client = Client()
    client_state = ClientState(client)
    gui = GUI(client_state)
    gui.show()
    sys.exit(app.exec_())

通常来说,这是拥有这样一个层的有效设计方法吗?

是的,它是有效的,在复杂的应用程序中,FSM 的实现是因为它们简化了逻辑。


关于恕我直言的简化,我更愿意验证在这种情况下 Qt 中是否存在类似的工具,因为它们通过事件或信号与 Qt 的元素友好地交互。在这种情况下,至少有 2 个选项:

The State Machine Framework:

import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets
import numpy as np


class Client(QtCore.QObject):
    # Client signals
    sig_move_done = QtCore.Signal()
    sig_disconnected = QtCore.Signal()
    sig_connected = QtCore.Signal()

    @QtCore.Slot(int)
    def client_move(self, dest):
        print(f"Client moving to {dest}...")
        time.sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @QtCore.Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @QtCore.Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


class GUI(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("State")

        self.btn_move = QtWidgets.QPushButton("move")
        self.btn_connect = QtWidgets.QPushButton("(re-)connect")

        self.client = Client()
        self._thread = QtCore.QThread(self)
        self._thread.start()
        self.client.moveToThread(self._thread)

        lay = QtWidgets.QVBoxLayout(self)
        lay.addWidget(self.btn_move)
        lay.addWidget(self.btn_connect)
        self.resize(320, 120)

        # states
        self.unknown_state = QtCore.QState()
        self.ready_state = QtCore.QState()
        self.moving_state = QtCore.QState()

        # transitions
        self.ready_state.addTransition(self.btn_move.clicked, self.moving_state)
        self.moving_state.addTransition(self.client.sig_move_done, self.ready_state)
        self.ready_state.addTransition(self.client.sig_disconnected, self.unknown_state)
        self.moving_state.addTransition(self.client.sig_disconnected, self.unknown_state)
        self.unknown_state.addTransition(self.btn_connect.clicked, self.ready_state)
        self.unknown_state.addTransition(self.client.sig_connected, self.ready_state)

        self.unknown_state.entered.connect(self.on_unknown_state_enter)
        self.ready_state.entered.connect(self.on_ready_state_enter)
        self.moving_state.entered.connect(self.on_moving_state_enter)

        state_machine = QtCore.QStateMachine(self)
        state_machine.addState(self.ready_state)
        state_machine.addState(self.moving_state)
        state_machine.addState(self.unknown_state)

        state_machine.setInitialState(self.unknown_state)
        state_machine.start()

    def on_unknown_state_enter(self):
        print("unknown_state")
        self.btn_move.setDisabled(True)
        self.btn_connect.setEnabled(True)

    def on_ready_state_enter(self):
        print("ready_state")
        self.btn_move.setEnabled(True)
        self.btn_connect.setDisabled(True)

    def on_moving_state_enter(self):
        print("moving_state")
        self.btn_move.setDisabled(True)
        self.btn_connect.setDisabled(True)
        dest = np.random.randint(1, 100)
        wrapper = partial(self.client.client_move, dest)
        QtCore.QTimer.singleShot(0, wrapper)

    def closeEvent(self, event):
        self._thread.quit()
        self._thread.wait()
        super().closeEvent(event)


if __name__ == "__main__":
    import sys

    app = QtWidgets.QApplication(sys.argv)

    w = GUI()
    w.show()

    sys.exit(app.exec_())

Qt SCXML:

Simple_State_Machine.scxml

<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" version="1.0" binding="early" xmlns:qt="http://www.qt.io/2015/02/scxml-ext" name="Simple_State_Machine" qt:editorversion="4.10.0" initial="unknown">
    <qt:editorinfo initialGeometry="150.82;359.88;-20;-20;40;40"/>
    <state id="ready">
        <qt:editorinfo stateColor="#ff974f" geometry="425.83;190.46;-60;-50;120;100" scenegeometry="425.83;190.46;365.83;140.46;120;100"/>
        <transition type="internal" event="move" target="moving">
            <qt:editorinfo endTargetFactors="35.02;9.52" movePoint="-34.84;14.59" startTargetFactors="32.33;90.16"/>
        </transition>
        <transition type="internal" event="disconnect" target="unknown">
            <qt:editorinfo endTargetFactors="91.87;60.92" movePoint="9.38;9.36" startTargetFactors="6.25;63.37"/>
        </transition>
    </state>
    <state id="unknown">
        <qt:editorinfo stateColor="#89725b" geometry="150.82;190.46;-60;-50;120;100" scenegeometry="150.82;190.46;90.82;140.46;120;100"/>
        <transition type="internal" target="ready" event="connect">
            <qt:editorinfo endTargetFactors="6.34;41.14" movePoint="0;7.30" startTargetFactors="91.13;39.41"/>
        </transition>
    </state>
    <state id="moving">
        <qt:editorinfo stateColor="#a508d0" geometry="425.83;344.53;-60;-50;120;100" scenegeometry="425.83;344.53;365.83;294.53;120;100"/>
        <transition type="internal" event="disconnect" target="unknown">
            <qt:editorinfo movePoint="2.08;17.72"/>
        </transition>
        <transition type="internal" event="stopped" target="ready">
            <qt:editorinfo endTargetFactors="68.30;90.08" movePoint="62.50;10.32" startTargetFactors="68.69;5.74"/>
        </transition>
    </state>
</scxml>

import os
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets, QtScxml
import numpy as np


class Client(QtCore.QObject):
    # Client signals
    sig_move_done = QtCore.Signal()
    sig_disconnected = QtCore.Signal()
    sig_connected = QtCore.Signal()

    @QtCore.Slot(int)
    def client_move(self, dest):
        print(f"Client moving to {dest}...")
        time.sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @QtCore.Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @QtCore.Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


class GUI(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("State")

        self.btn_move = QtWidgets.QPushButton("move")
        self.btn_connect = QtWidgets.QPushButton("(re-)connect")

        self.client = Client()
        self._thread = QtCore.QThread(self)
        self._thread.start()
        self.client.moveToThread(self._thread)

        lay = QtWidgets.QVBoxLayout(self)
        lay.addWidget(self.btn_move)
        lay.addWidget(self.btn_connect)
        self.resize(320, 120)

        current_dir = os.path.dirname(os.path.realpath(__file__))
        filename = os.path.join(current_dir, "Simple_State_Machine.scxml")

        machine = QtScxml.QScxmlStateMachine.fromFile(filename)
        machine.setParent(self)

        for error in machine.parseErrors():
            print(error.toString())

        machine.connectToState("unknown", self, QtCore.SLOT("on_unknown_state_enter(bool)"))
        machine.connectToState("ready", self, QtCore.SLOT("on_ready_state_enter(bool)"))
        machine.connectToState("moving", self, QtCore.SLOT("on_moving_state_enter(bool)"))


        self.btn_connect.clicked.connect(partial(machine.submitEvent, "connect"))
        self.btn_move.clicked.connect(partial(machine.submitEvent, "move"))

        self.client.sig_disconnected.connect(partial(machine.submitEvent, "disconnect"))
        self.client.sig_connected.connect(partial(machine.submitEvent, "connect"))
        self.client.sig_move_done.connect(partial(machine.submitEvent, "stopped"))

        machine.start()

    @QtCore.Slot(bool)
    def on_unknown_state_enter(self, active):
        if active:
            print("unknown_state")
            self.btn_move.setDisabled(True)
            self.btn_connect.setEnabled(True)

    @QtCore.Slot(bool)
    def on_ready_state_enter(self, active):
        if active:
            print("ready_state")
            self.btn_move.setEnabled(True)
            self.btn_connect.setDisabled(True)

    @QtCore.Slot(bool)
    def on_moving_state_enter(self, active):
        if active:
            print("moving_state")
            self.btn_move.setDisabled(True)
            self.btn_connect.setDisabled(True)
            dest = np.random.randint(1, 100)
            wrapper = partial(self.client.client_move, dest)
            QtCore.QTimer.singleShot(0, wrapper)

    def closeEvent(self, event):
        self._thread.quit()
        self._thread.wait()
        super().closeEvent(event)


if __name__ == "__main__":
    import sys

    app = QtWidgets.QApplication(sys.argv)

    w = GUI()
    w.show()

    sys.exit(app.exec_())