pyTransitions trigger() 方法块

pyTransitions trigger() method blocks

我有一个相当复杂的应用程序,涉及 GUI 前端和其他几个 类,其中几个是基于优秀 pytransitions 库的状态机。 应用程序在不同时间挂起,并且是多线程很难调试,但我在这里复制了一个最小的例子:

from transitions import Machine
import time

States = ["OFF", "ON"]
Transitions = [{"trigger": "PowerUp", "source": "OFF", "dest": "ON"}]

class GUI:
    def __init__(self):
        self.machine1 = Machine_1(self)
        self.machine2 = Machine_2(self)

    def on_button_pressed(self):
        self.machine2.trigger("PowerUp")
        self.machine1.trigger("PowerUp")


class Machine_1:
    def __init__(self, parent):
        self.parent = parent
        self.fsm = Machine(model=self, states=States, transitions=Transitions, initial="OFF")

    def on_enter_ON(self):
        print("machine1 is on!")
        self.parent.machine2.signal = True


class Machine_2:
    def __init__(self, parent):
        self.parent = parent
        self.fsm = Machine(model=self, states=States, transitions=Transitions, initial="OFF")
        self.signal = False

    def on_enter_ON(self):
        print("machine2 waits for machine1 ...")
        while self.signal is False:
            time.sleep(0.1)
        print("machine2 on!")

即当 machine1 “打开”时,它向 machine2 发送信号。 machine2 在执行其“开启”状态的逻辑之前必须等待此信号。

使用Python REPL 作为主循环:

>>> import test
>>> gui = test.GUI()        
>>> gui.machine1.state      
'off'
>>> gui.machine2.state      
'off'
>>> gui.on_button_pressed()
machine2 waits for machine1 ...
(never returns)

machine2 卡在其 while 循环中,等待来自 machine1 的信号,该信号从未到达,因为 machine1 从未被触发。

如果我重新排序 gui.on_button_pressed() 中的触发器序列:

self.machine1.trigger("PowerUp")
self.machine2.trigger("PowerUp")

...一切正常:

>>> gui.on_button_pressed()
machine1 is on!
machine2 waits for machine1 ...
machine2 is on!

这是怎么回事? trigger() 方法应该阻塞吗?它是否记录在某处,什么时候 return?或者我是否以不受支持的方式使用 on_enter_state 回调?有我应该使用的推荐模式吗?

pytransitions的核心Machine既不是线程也不是异步的。这意味着当回调不 return 时,整个事件处理和触发函数将阻塞。有多种方法可以解决这个问题。选择哪种方式取决于您的体系结构。为所有方法提供 MWE 对于 SO 答案来说有点太多了,但我可以简要概述解决方案的样子。

事件驱动

因为 Machine_1 已经引用了 Machine_2 并且 Machine_2 正在等待来自 Machine_1 的事件,您可以显式地模拟该转换:


Transitions = [{"trigger": "PowerUp", "source": "OFF", "dest": "ON"}]
# ...
self.machine2 = Machine_2(self)
self.machine2.add_transition("Machine1PoweredUp", "OFF", "ON")
# ... 
    def on_enter_ON(self):
        print("machine1 is on!")
        self.parent.machine2.Machine1PoweredUp()

请注意,当您拥有共享相同状态和转换的机器时,您可以使用一台机器和多个模型。

def Model_1():

    def on_enter_ON(self):
        # a dispatched event is forwarded to all models attached to the 
        # machine in question. We consider `parent` to be such a machine here.
        self.parent.dispatch("Machine1PoweredUp")

s1 = Model_1()
s2 = Model_2()
# we set `ignore_invalid_triggers` since `s1` need to process the fired
# event as well even though it is already in state `ON`
m = Machine(model=[s1, s2], ignore_invalid_triggers=True)
s1.parent = m
s1.PowerUp()

您可以将共享机器用作某种'event bus',这可以减少耦合和依赖性。

轮询

您可以使 Machine_2 'attempt' 具有条件和 return 的转换,而不是在主循环中阻塞。条件可以是模型方法的名称或对可调用对象的引用。由于 pytransitions 总是向模型添加方便的状态检查功能,我们可以将状态检查 is_ONmachine1 传递到转换定义。当不满足条件时,条件将停止并return,从而启用顺序工作流。

self.machine2.add_transition("PowerUp", "OFF", "ON", conditions=machine1.is_ON)
# ...

while not machine2.is_ON:
  time.sleep(0.1)
  machine2.PowerUp()

不过您应该注意,当执行 on_enter_<state> 回调时,machine/model 已被视为处于该状态。对于您的特定用例,这意味着 machine1.is_ON returns True 即使 Machine_1on_enter_ON 回调仍在处理中。

线程

您也可以为机器 2 中的机器 1 状态检查创建一个线程。对于 'unblock' 过渡,我建议添加一个临时状态,例如 Booting 以表明 Machine_2 尚未准备好。


class Machine_2:

    def check_state(self):
        while self.signal is False:
            time.sleep(0.1)
        self.Machine1Ready()  # transition from 'Booting' to 'ON'

    def on_enter_ON(self):
        print("machine2 on!")

    def on_enter_BOOTING(self):
        print("machine2 waits for machine1 ...")
        threading.Thread(target=self.check_state).start()

transitions 具有 LockedMachine 以防多个线程需要访问同一台机器。然而,对状态机的线程访问有一些缺陷,根据我的经验,调试起来相当困难。

异步

这是一个简短的例子,说明如何使用 AsyncMachine 来实现这样的目标。此示例用于说明目的,不应以这种方式复制:

from transitions.extensions.asyncio import AsyncMachine
import asyncio


class Model1:

    done = False

    async def on_enter_ON(self):
        await asyncio.sleep(0.1)
        print("Machine 1 done")
        self.done = True


model1 = Model1()


class Model2:

    done = False

    async def on_enter_ON(self):
        while not model1.done:
            print("Machine 1 not ready yet")
            await asyncio.sleep(0.05)
        print("Machine 2 done")


model2 = Model2()

m = AsyncMachine(model=[model1, model2], states=["OFF", "ON"], initial="OFF")
asyncio.get_event_loop().run_until_complete(asyncio.gather(model1.to_ON(), model2.to_ON()))