如何委托监听“transitions”状态机的进入状态?
How to delegate listening to entering states of a `transitions` state machine?
我正在尝试使用 transitions 库。
这个问题跟在 之后,相当松散。
我想将对 on_enter
事件的监听委托给所有状态,并创建几个这样的监听器,它们可以订阅并在进入状态时收到通知。
就我而言,我想通知外部事件系统根据状态订阅不同的事件配置。
对于这个例子,我将使用状态机(比如固体<->流体<->气体,事件为 [热,冷])。
这可以很容易地使用像这样的库来完成
from transitions import Machine
from transitions import EventData
class Matter(object):
def __init__(self):
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
self.machine = Machine(
model=self,
states=['solid', 'liquid', 'gas'],
transitions=transitions,
initial='solid',
send_event=True
)
def on_enter_gas(self, event: EventData):
print(f"entering gas from {event.transition.source}")
def on_enter_liquid(self, event: EventData):
print(f"entering liquid from {event.transition.source}")
def on_enter_solid(self, event: EventData):
print(f"entering solid from {event.transition.source}")
matter = Matter()
matter.heat() # entering liquid from solid
matter.heat() # entering gas from liquid
matter.cool() # entering liquid from gas
matter.cool() # entering solid from liquid
太棒了!现在,我想通过订阅向外部通知有关 on_enter
个事件。
我想以一种最少将外部世界耦合到机器内部的方式来做到这一点,这样如果我要更改状态名称,或者添加或删除状态,我就不会担心破坏任何用户机器。
我可以实现的一种方法如下,缺点是耦合到机器内部,并迫使我自己实现库的很多功能。
from transitions import Machine
from transitions import EventData
from typing import Callable
class Matter(object):
states = ['solid', 'liquid', 'gas']
def __init__(self):
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
self.machine = Machine(
model=self,
states=self.states,
transitions=transitions,
initial='solid',
send_event=True
)
self._subscriptions = {}
def on_enter_gas(self, event: EventData):
print(f"entering gas from {event.transition.source}")
if "on_enter_gas" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def on_enter_liquid(self, event: EventData):
print(f"entering liquid from {event.transition.source}")
if "on_enter_liquid" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def on_enter_solid(self, event: EventData):
print(f"entering solid from {event.transition.source}")
if "on_enter_solid" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def subscribe(self, state: str, trigger: str, callback: Callable):
assert state in self.states
machine_event = trigger + "_" + state
if machine_event not in self._subscriptions:
self._subscriptions[machine_event] = callback
这允许为任何状态添加外部回调。
根据评论 , the above should have some better API to dynamically add subscriptions per state, but I was not able to find it in the doc。
即使库确实可以做到这一点,我认为这还不够。
任何订阅者都必须知道机器的状态才能订阅它们,而不是简单地成为机器上的监听器,并实现任何事件以通知它发生,就像一个人可以只需通过状态“固体”的存在即可轻松添加 on_enter_solid
。
我最想做的是有一些监听器 class 我可以继承(或以其他方式)并且只实现我需要监听的方法,外部 .
完成此操作的最佳方法是什么,或者使用库进行类似操作?
I want to notify externally, via subscriptions about on_enter events.
I want to do that in a way that would least couple the outside world to the insides of the machine, so that if I were to change a state name, or add or remove a state, I wouldn't worry about breaking any users of the machine.
耦合度最低的是只转发事件并让订阅者决定如何处理它:
from transitions import Machine
from transitions import EventData
from typing import Callable
class Observer:
def state_changed(self, event_data: EventData):
print(f"state is now '{event_data.state.name}'")
class SubscribableMachine(Machine):
states = ['solid', 'liquid', 'gas']
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
def __init__(self):
super().__init__(states=self.states, transitions=self.transitions,
initial='solid', after_state_change="notify",
send_event=True)
self._subscriptions = []
def notify(self, event_data: EventData):
for func in self._subscriptions:
func(event_data)
def subscribe(self, func: Callable):
self._subscriptions.append(func)
machine = SubscribableMachine()
observer = Observer()
machine.subscribe(observer.state_changed)
machine.heat() # >>> state is now 'LIQUID'
如果您让观察者订阅特定的转换 and/or 状态事件,这显然会在您稍后重命名这些事件时破坏它们的代码。但是,在我看来,仅传递事件会大大降低状态机和一般状态模式的实用性,因为它是状态模式中最好的部分之一,它摆脱了 if-elif-else-cascades。
What I would ideally like to do is have some listener class I can inherit (or otherwise) and only implement the methods I need to listen to, externally.
我会说你不需要特定的听众 class。您可以直接将可调用对象添加到状态 enter/exit 回调中。此外,您可以将字符串替换为 (string) Enums 作为状态标识符。这样,您可以在不影响观察者的情况下更改 Enum 的值。这可以防止在订阅特定状态时出现拼写错误:
from transitions import Machine
from transitions import EventData
from typing import Callable
from enum import Enum, auto
class Observer:
def state_changed(self, event_data: EventData):
print(f"state is now '{event_data.state.name}'")
class State(Enum):
SOLID = auto()
LIQUID = auto()
GAS = auto()
class SubscribableMachine(Machine):
transitions = [
{'trigger': 'heat', 'source': State.SOLID, 'dest': State.LIQUID},
{'trigger': 'heat', 'source': State.LIQUID, 'dest': State.GAS},
{'trigger': 'cool', 'source': State.GAS, 'dest': State.LIQUID},
{'trigger': 'cool', 'source': State.LIQUID, 'dest': State.SOLID}
]
def __init__(self):
super().__init__(states=State, transitions=self.transitions,
initial=State.SOLID, send_event=True)
def subscribe(self, func: Callable, state: State):
self.get_state(state).on_enter.append(func)
def unsubscribe(self, func: Callable, state: State):
self.get_state(state).on_enter.remove(func)
machine = SubscribableMachine()
observer = Observer()
machine.subscribe(observer.state_changed, State.LIQUID)
machine.heat() # >>> state is now 'LIQUID'
machine.heat()
assert machine.state == State.GAS
machine.unsubscribe(observer.state_changed, State.LIQUID)
machine.cool() # no output
assert machine.state == State.LIQUID
What is the syntax to subscribe in the same way to specific transitions?
对于转换,您可以使用 machine.get_transitions(trigger, source, dest)
来获取一组转换。正如文档中提到的(例如 Callback execution order),转换具有两个回调事件:before
和 after
。如果您想在转换发生后(也在调用 State.enter
之后)收到通知,您的 subscribe/unsubscribe 方法可能如下所示:
def subscribe(self, func, trigger="", source="*", dest="*"):
for transition in self.get_transitions(trigger, source, dest):
transition.after.append(func)
def unsubscribe(self, func, trigger="", source="*", dest="*"):
for transition in self.get_transitions(trigger, source, dest):
transition.after.remove(func)
# ...
machine.subscribe(observer.state_changed, "heat")
machine.heat() >>> state is now 'LIQUID'
machine.heat() >>> state is now 'GAS'
您可以改为 before
并查看 state_changed
的输出如何变化。此外,您可以通过 source
或 destination
进一步缩小范围:
machine.subscribe(observer.state_changed, "heat", source=State.LIQUID)
# ...
machine.heat() >>> <nothing>
machine.heat() >>> state is now 'GAS'
要取消订阅,您需要记住过滤器设置或在 list.remove
尝试删除不在回调数组中的元素时捕获错误。
我正在尝试使用 transitions 库。
这个问题跟在
我想将对 on_enter
事件的监听委托给所有状态,并创建几个这样的监听器,它们可以订阅并在进入状态时收到通知。
就我而言,我想通知外部事件系统根据状态订阅不同的事件配置。
对于这个例子,我将使用状态机(比如固体<->流体<->气体,事件为 [热,冷])。
这可以很容易地使用像这样的库来完成
from transitions import Machine
from transitions import EventData
class Matter(object):
def __init__(self):
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
self.machine = Machine(
model=self,
states=['solid', 'liquid', 'gas'],
transitions=transitions,
initial='solid',
send_event=True
)
def on_enter_gas(self, event: EventData):
print(f"entering gas from {event.transition.source}")
def on_enter_liquid(self, event: EventData):
print(f"entering liquid from {event.transition.source}")
def on_enter_solid(self, event: EventData):
print(f"entering solid from {event.transition.source}")
matter = Matter()
matter.heat() # entering liquid from solid
matter.heat() # entering gas from liquid
matter.cool() # entering liquid from gas
matter.cool() # entering solid from liquid
太棒了!现在,我想通过订阅向外部通知有关 on_enter
个事件。
我想以一种最少将外部世界耦合到机器内部的方式来做到这一点,这样如果我要更改状态名称,或者添加或删除状态,我就不会担心破坏任何用户机器。
我可以实现的一种方法如下,缺点是耦合到机器内部,并迫使我自己实现库的很多功能。
from transitions import Machine
from transitions import EventData
from typing import Callable
class Matter(object):
states = ['solid', 'liquid', 'gas']
def __init__(self):
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
self.machine = Machine(
model=self,
states=self.states,
transitions=transitions,
initial='solid',
send_event=True
)
self._subscriptions = {}
def on_enter_gas(self, event: EventData):
print(f"entering gas from {event.transition.source}")
if "on_enter_gas" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def on_enter_liquid(self, event: EventData):
print(f"entering liquid from {event.transition.source}")
if "on_enter_liquid" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def on_enter_solid(self, event: EventData):
print(f"entering solid from {event.transition.source}")
if "on_enter_solid" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def subscribe(self, state: str, trigger: str, callback: Callable):
assert state in self.states
machine_event = trigger + "_" + state
if machine_event not in self._subscriptions:
self._subscriptions[machine_event] = callback
这允许为任何状态添加外部回调。
根据评论
即使库确实可以做到这一点,我认为这还不够。
任何订阅者都必须知道机器的状态才能订阅on_enter_solid
。
我最想做的是有一些监听器 class 我可以继承(或以其他方式)并且只实现我需要监听的方法,外部 .
完成此操作的最佳方法是什么,或者使用库进行类似操作?
I want to notify externally, via subscriptions about on_enter events. I want to do that in a way that would least couple the outside world to the insides of the machine, so that if I were to change a state name, or add or remove a state, I wouldn't worry about breaking any users of the machine.
耦合度最低的是只转发事件并让订阅者决定如何处理它:
from transitions import Machine
from transitions import EventData
from typing import Callable
class Observer:
def state_changed(self, event_data: EventData):
print(f"state is now '{event_data.state.name}'")
class SubscribableMachine(Machine):
states = ['solid', 'liquid', 'gas']
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
def __init__(self):
super().__init__(states=self.states, transitions=self.transitions,
initial='solid', after_state_change="notify",
send_event=True)
self._subscriptions = []
def notify(self, event_data: EventData):
for func in self._subscriptions:
func(event_data)
def subscribe(self, func: Callable):
self._subscriptions.append(func)
machine = SubscribableMachine()
observer = Observer()
machine.subscribe(observer.state_changed)
machine.heat() # >>> state is now 'LIQUID'
如果您让观察者订阅特定的转换 and/or 状态事件,这显然会在您稍后重命名这些事件时破坏它们的代码。但是,在我看来,仅传递事件会大大降低状态机和一般状态模式的实用性,因为它是状态模式中最好的部分之一,它摆脱了 if-elif-else-cascades。
What I would ideally like to do is have some listener class I can inherit (or otherwise) and only implement the methods I need to listen to, externally.
我会说你不需要特定的听众 class。您可以直接将可调用对象添加到状态 enter/exit 回调中。此外,您可以将字符串替换为 (string) Enums 作为状态标识符。这样,您可以在不影响观察者的情况下更改 Enum 的值。这可以防止在订阅特定状态时出现拼写错误:
from transitions import Machine
from transitions import EventData
from typing import Callable
from enum import Enum, auto
class Observer:
def state_changed(self, event_data: EventData):
print(f"state is now '{event_data.state.name}'")
class State(Enum):
SOLID = auto()
LIQUID = auto()
GAS = auto()
class SubscribableMachine(Machine):
transitions = [
{'trigger': 'heat', 'source': State.SOLID, 'dest': State.LIQUID},
{'trigger': 'heat', 'source': State.LIQUID, 'dest': State.GAS},
{'trigger': 'cool', 'source': State.GAS, 'dest': State.LIQUID},
{'trigger': 'cool', 'source': State.LIQUID, 'dest': State.SOLID}
]
def __init__(self):
super().__init__(states=State, transitions=self.transitions,
initial=State.SOLID, send_event=True)
def subscribe(self, func: Callable, state: State):
self.get_state(state).on_enter.append(func)
def unsubscribe(self, func: Callable, state: State):
self.get_state(state).on_enter.remove(func)
machine = SubscribableMachine()
observer = Observer()
machine.subscribe(observer.state_changed, State.LIQUID)
machine.heat() # >>> state is now 'LIQUID'
machine.heat()
assert machine.state == State.GAS
machine.unsubscribe(observer.state_changed, State.LIQUID)
machine.cool() # no output
assert machine.state == State.LIQUID
What is the syntax to subscribe in the same way to specific transitions?
对于转换,您可以使用 machine.get_transitions(trigger, source, dest)
来获取一组转换。正如文档中提到的(例如 Callback execution order),转换具有两个回调事件:before
和 after
。如果您想在转换发生后(也在调用 State.enter
之后)收到通知,您的 subscribe/unsubscribe 方法可能如下所示:
def subscribe(self, func, trigger="", source="*", dest="*"):
for transition in self.get_transitions(trigger, source, dest):
transition.after.append(func)
def unsubscribe(self, func, trigger="", source="*", dest="*"):
for transition in self.get_transitions(trigger, source, dest):
transition.after.remove(func)
# ...
machine.subscribe(observer.state_changed, "heat")
machine.heat() >>> state is now 'LIQUID'
machine.heat() >>> state is now 'GAS'
您可以改为 before
并查看 state_changed
的输出如何变化。此外,您可以通过 source
或 destination
进一步缩小范围:
machine.subscribe(observer.state_changed, "heat", source=State.LIQUID)
# ...
machine.heat() >>> <nothing>
machine.heat() >>> state is now 'GAS'
要取消订阅,您需要记住过滤器设置或在 list.remove
尝试删除不在回调数组中的元素时捕获错误。