pyTransitions trigger() 方法块
pyTransitions trigger() method blocks
我有一个相当复杂的应用程序,涉及 GUI 前端和其他几个 类,其中几个是基于优秀 pytransitions 库的状态机。
应用程序在不同时间挂起,并且是多线程很难调试,但我在这里复制了一个最小的例子:
from transitions import Machine
import time
States = ["OFF", "ON"]
Transitions = [{"trigger": "PowerUp", "source": "OFF", "dest": "ON"}]
class GUI:
def __init__(self):
self.machine1 = Machine_1(self)
self.machine2 = Machine_2(self)
def on_button_pressed(self):
self.machine2.trigger("PowerUp")
self.machine1.trigger("PowerUp")
class Machine_1:
def __init__(self, parent):
self.parent = parent
self.fsm = Machine(model=self, states=States, transitions=Transitions, initial="OFF")
def on_enter_ON(self):
print("machine1 is on!")
self.parent.machine2.signal = True
class Machine_2:
def __init__(self, parent):
self.parent = parent
self.fsm = Machine(model=self, states=States, transitions=Transitions, initial="OFF")
self.signal = False
def on_enter_ON(self):
print("machine2 waits for machine1 ...")
while self.signal is False:
time.sleep(0.1)
print("machine2 on!")
即当 machine1 “打开”时,它向 machine2 发送信号。 machine2 在执行其“开启”状态的逻辑之前必须等待此信号。
使用Python REPL 作为主循环:
>>> import test
>>> gui = test.GUI()
>>> gui.machine1.state
'off'
>>> gui.machine2.state
'off'
>>> gui.on_button_pressed()
machine2 waits for machine1 ...
(never returns)
machine2 卡在其 while 循环中,等待来自 machine1 的信号,该信号从未到达,因为 machine1 从未被触发。
如果我重新排序 gui.on_button_pressed()
中的触发器序列:
self.machine1.trigger("PowerUp")
self.machine2.trigger("PowerUp")
...一切正常:
>>> gui.on_button_pressed()
machine1 is on!
machine2 waits for machine1 ...
machine2 is on!
这是怎么回事? trigger()
方法应该阻塞吗?它是否记录在某处,什么时候 return?或者我是否以不受支持的方式使用 on_enter_state
回调?有我应该使用的推荐模式吗?
pytransitions
的核心Machine
既不是线程也不是异步的。这意味着当回调不 return 时,整个事件处理和触发函数将阻塞。有多种方法可以解决这个问题。选择哪种方式取决于您的体系结构。为所有方法提供 MWE 对于 SO 答案来说有点太多了,但我可以简要概述解决方案的样子。
事件驱动
因为 Machine_1
已经引用了 Machine_2
并且 Machine_2
正在等待来自 Machine_1
的事件,您可以显式地模拟该转换:
Transitions = [{"trigger": "PowerUp", "source": "OFF", "dest": "ON"}]
# ...
self.machine2 = Machine_2(self)
self.machine2.add_transition("Machine1PoweredUp", "OFF", "ON")
# ...
def on_enter_ON(self):
print("machine1 is on!")
self.parent.machine2.Machine1PoweredUp()
请注意,当您拥有共享相同状态和转换的机器时,您可以使用一台机器和多个模型。
def Model_1():
def on_enter_ON(self):
# a dispatched event is forwarded to all models attached to the
# machine in question. We consider `parent` to be such a machine here.
self.parent.dispatch("Machine1PoweredUp")
s1 = Model_1()
s2 = Model_2()
# we set `ignore_invalid_triggers` since `s1` need to process the fired
# event as well even though it is already in state `ON`
m = Machine(model=[s1, s2], ignore_invalid_triggers=True)
s1.parent = m
s1.PowerUp()
您可以将共享机器用作某种'event bus',这可以减少耦合和依赖性。
轮询
您可以使 Machine_2
'attempt' 具有条件和 return 的转换,而不是在主循环中阻塞。条件可以是模型方法的名称或对可调用对象的引用。由于 pytransitions
总是向模型添加方便的状态检查功能,我们可以将状态检查 is_ON
从 machine1
传递到转换定义。当不满足条件时,条件将停止并return,从而启用顺序工作流。
self.machine2.add_transition("PowerUp", "OFF", "ON", conditions=machine1.is_ON)
# ...
while not machine2.is_ON:
time.sleep(0.1)
machine2.PowerUp()
不过您应该注意,当执行 on_enter_<state>
回调时,machine/model 已被视为处于该状态。对于您的特定用例,这意味着 machine1.is_ON
returns True
即使 Machine_1
的 on_enter_ON
回调仍在处理中。
线程
您也可以为机器 2 中的机器 1 状态检查创建一个线程。对于 'unblock' 过渡,我建议添加一个临时状态,例如 Booting
以表明 Machine_2
尚未准备好。
class Machine_2:
def check_state(self):
while self.signal is False:
time.sleep(0.1)
self.Machine1Ready() # transition from 'Booting' to 'ON'
def on_enter_ON(self):
print("machine2 on!")
def on_enter_BOOTING(self):
print("machine2 waits for machine1 ...")
threading.Thread(target=self.check_state).start()
transitions
具有 LockedMachine
以防多个线程需要访问同一台机器。然而,对状态机的线程访问有一些缺陷,根据我的经验,调试起来相当困难。
异步
这是一个简短的例子,说明如何使用 AsyncMachine
来实现这样的目标。此示例用于说明目的,不应以这种方式复制:
from transitions.extensions.asyncio import AsyncMachine
import asyncio
class Model1:
done = False
async def on_enter_ON(self):
await asyncio.sleep(0.1)
print("Machine 1 done")
self.done = True
model1 = Model1()
class Model2:
done = False
async def on_enter_ON(self):
while not model1.done:
print("Machine 1 not ready yet")
await asyncio.sleep(0.05)
print("Machine 2 done")
model2 = Model2()
m = AsyncMachine(model=[model1, model2], states=["OFF", "ON"], initial="OFF")
asyncio.get_event_loop().run_until_complete(asyncio.gather(model1.to_ON(), model2.to_ON()))
我有一个相当复杂的应用程序,涉及 GUI 前端和其他几个 类,其中几个是基于优秀 pytransitions 库的状态机。 应用程序在不同时间挂起,并且是多线程很难调试,但我在这里复制了一个最小的例子:
from transitions import Machine
import time
States = ["OFF", "ON"]
Transitions = [{"trigger": "PowerUp", "source": "OFF", "dest": "ON"}]
class GUI:
def __init__(self):
self.machine1 = Machine_1(self)
self.machine2 = Machine_2(self)
def on_button_pressed(self):
self.machine2.trigger("PowerUp")
self.machine1.trigger("PowerUp")
class Machine_1:
def __init__(self, parent):
self.parent = parent
self.fsm = Machine(model=self, states=States, transitions=Transitions, initial="OFF")
def on_enter_ON(self):
print("machine1 is on!")
self.parent.machine2.signal = True
class Machine_2:
def __init__(self, parent):
self.parent = parent
self.fsm = Machine(model=self, states=States, transitions=Transitions, initial="OFF")
self.signal = False
def on_enter_ON(self):
print("machine2 waits for machine1 ...")
while self.signal is False:
time.sleep(0.1)
print("machine2 on!")
即当 machine1 “打开”时,它向 machine2 发送信号。 machine2 在执行其“开启”状态的逻辑之前必须等待此信号。
使用Python REPL 作为主循环:
>>> import test
>>> gui = test.GUI()
>>> gui.machine1.state
'off'
>>> gui.machine2.state
'off'
>>> gui.on_button_pressed()
machine2 waits for machine1 ...
(never returns)
machine2 卡在其 while 循环中,等待来自 machine1 的信号,该信号从未到达,因为 machine1 从未被触发。
如果我重新排序 gui.on_button_pressed()
中的触发器序列:
self.machine1.trigger("PowerUp")
self.machine2.trigger("PowerUp")
...一切正常:
>>> gui.on_button_pressed()
machine1 is on!
machine2 waits for machine1 ...
machine2 is on!
这是怎么回事? trigger()
方法应该阻塞吗?它是否记录在某处,什么时候 return?或者我是否以不受支持的方式使用 on_enter_state
回调?有我应该使用的推荐模式吗?
pytransitions
的核心Machine
既不是线程也不是异步的。这意味着当回调不 return 时,整个事件处理和触发函数将阻塞。有多种方法可以解决这个问题。选择哪种方式取决于您的体系结构。为所有方法提供 MWE 对于 SO 答案来说有点太多了,但我可以简要概述解决方案的样子。
事件驱动
因为 Machine_1
已经引用了 Machine_2
并且 Machine_2
正在等待来自 Machine_1
的事件,您可以显式地模拟该转换:
Transitions = [{"trigger": "PowerUp", "source": "OFF", "dest": "ON"}]
# ...
self.machine2 = Machine_2(self)
self.machine2.add_transition("Machine1PoweredUp", "OFF", "ON")
# ...
def on_enter_ON(self):
print("machine1 is on!")
self.parent.machine2.Machine1PoweredUp()
请注意,当您拥有共享相同状态和转换的机器时,您可以使用一台机器和多个模型。
def Model_1():
def on_enter_ON(self):
# a dispatched event is forwarded to all models attached to the
# machine in question. We consider `parent` to be such a machine here.
self.parent.dispatch("Machine1PoweredUp")
s1 = Model_1()
s2 = Model_2()
# we set `ignore_invalid_triggers` since `s1` need to process the fired
# event as well even though it is already in state `ON`
m = Machine(model=[s1, s2], ignore_invalid_triggers=True)
s1.parent = m
s1.PowerUp()
您可以将共享机器用作某种'event bus',这可以减少耦合和依赖性。
轮询
您可以使 Machine_2
'attempt' 具有条件和 return 的转换,而不是在主循环中阻塞。条件可以是模型方法的名称或对可调用对象的引用。由于 pytransitions
总是向模型添加方便的状态检查功能,我们可以将状态检查 is_ON
从 machine1
传递到转换定义。当不满足条件时,条件将停止并return,从而启用顺序工作流。
self.machine2.add_transition("PowerUp", "OFF", "ON", conditions=machine1.is_ON)
# ...
while not machine2.is_ON:
time.sleep(0.1)
machine2.PowerUp()
不过您应该注意,当执行 on_enter_<state>
回调时,machine/model 已被视为处于该状态。对于您的特定用例,这意味着 machine1.is_ON
returns True
即使 Machine_1
的 on_enter_ON
回调仍在处理中。
线程
您也可以为机器 2 中的机器 1 状态检查创建一个线程。对于 'unblock' 过渡,我建议添加一个临时状态,例如 Booting
以表明 Machine_2
尚未准备好。
class Machine_2:
def check_state(self):
while self.signal is False:
time.sleep(0.1)
self.Machine1Ready() # transition from 'Booting' to 'ON'
def on_enter_ON(self):
print("machine2 on!")
def on_enter_BOOTING(self):
print("machine2 waits for machine1 ...")
threading.Thread(target=self.check_state).start()
transitions
具有 LockedMachine
以防多个线程需要访问同一台机器。然而,对状态机的线程访问有一些缺陷,根据我的经验,调试起来相当困难。
异步
这是一个简短的例子,说明如何使用 AsyncMachine
来实现这样的目标。此示例用于说明目的,不应以这种方式复制:
from transitions.extensions.asyncio import AsyncMachine
import asyncio
class Model1:
done = False
async def on_enter_ON(self):
await asyncio.sleep(0.1)
print("Machine 1 done")
self.done = True
model1 = Model1()
class Model2:
done = False
async def on_enter_ON(self):
while not model1.done:
print("Machine 1 not ready yet")
await asyncio.sleep(0.05)
print("Machine 2 done")
model2 = Model2()
m = AsyncMachine(model=[model1, model2], states=["OFF", "ON"], initial="OFF")
asyncio.get_event_loop().run_until_complete(asyncio.gather(model1.to_ON(), model2.to_ON()))