pytransitions/transitions: 有没有更好的方法来存储访问状态的历史?
pytransitions/transitions: Is there any better way to store the history of visited state?
我最近在 Python 中发现了一个轻量级的、面向对象的状态机实现,称为转换 (https://github.com/pytransitions/transitions)。所以我正在尝试使用这些状态机,尤其是 HierarchicalGraphMachine。我想要的一个很好的功能是存储访问状态的历史,即使机器没有移动(保持在相同状态)。
而且从我从示例中看到的情况来看,我们实际上无法以简单的方式做到这一点,因为当机器状态未更改时 before_state_change
和 after_state_change
未被调用。所以在这种情况下我们不能延长我们的历史。为了解决这个问题,我最终创建了一个 trigger_wrapper 函数:
def trigger_wrapper(self, trigger_name):
previous_state = self.state
result = None
try:
result = self.trigger(trigger_name)
except AttributeError as attribute_err:
print('Invalid trigger name: {}'.format(attribute_err))
except MachineError as machine_err:
print('Valid trigger name but not reachable: {}'.format(machine_err))
except Exception as err:
print('Cannot make transition with unknown error: {}'.format(err))
if result is False:
print('Trigger name reachable but condition(s) was not fulfilled')
....
current_state = self.state
# update history
.....
return result
然后,我们调用 trigger_wrapper 而不是触发器:
before: machine.trigger('drink')
now: machine.trigger_wrapper('drink').
除此之外,通过在初始化Machine
时设置ignore_invalid_triggers = False
并使用这个trigger_wrapper
函数,我们现在可以通过缓存知道机器不能移动的原因例外情况。
有没有更好的方案来保持跟踪访问状态?我认为另一种方法是覆盖触发函数,但由于 NestedState
.
看起来很复杂
编辑 1(遵循@aleneum 的建议)
感谢您的回复以及一个有趣的例子!!!
以使用finalize_event
为例。一切顺利,但这个回调函数似乎不足以捕获以下情况(我在代码中添加了 2 行):
... same setup as before
m.go()
m.internal()
m.reflexive()
m.condition()
m.go() # MachineError: "Can't trigger event go from state B!"
m.goo() # AttributeError: Do not know event named 'goo'.
>>> Expected: ['go', 'internal', 'reflexive', 'condition', 'go', 'goo']
>>> Expected: ['B', 'B', 'B', 'B', 'B', 'B']
换句话说,是否有另一个回调我们可以捕获由调用 invalid trigger
(示例中的 goo)或 valid trigger but not reachable from the current state
(从状态 B 调用 go())引起的异常?
再次感谢您的帮助。
正如您已经提到的,before_state_change
和 after_state_change
仅在发生转换时调用。这并不一定意味着状态更改,因为内部和自反转换也会触发这些回调:
from transitions import Machine
def test():
print("triggered")
m = Machine(states=['A', 'B'], transitions=[
['go', 'A', 'B'],
dict(trigger='internal', source='B', dest=None),
dict(trigger='reflexive', source='B', dest='='),
dict(trigger='condition', source='B', dest='A', conditions=lambda: False)
], after_state_change=test, initial='A')
m.go() # >>> triggered
m.internal() # >>> triggered
m.reflexive() # >>> triggered
m.condition() # no output
这里唯一不触发 after_state_change
的事件是 m.condition
,因为转换被(未满足的)条件停止。
因此,当您的目标是跟踪实际进行的转换时,after_state_change
是正确的选择。如果你想每 trigger/event 记录一次,你可以通过 finalize_event
:
'machine.finalize_event' - callbacks will be executed even if no transition took place or an exception has been raised
from transitions import Machine
event_log = []
state_log = []
def log_trigger(event_data):
event_log.append(event_data.event.name)
state_log.append(event_data.state)
m = Machine(states=['A', 'B'], transitions=[
['go', 'A', 'B'],
dict(trigger='internal', source='B', dest=None),
dict(trigger='reflexive', source='B', dest='='),
dict(trigger='condition', source='B', dest='A', conditions=lambda event_data: False)
], finalize_event=log_trigger, initial='A', send_event=True)
m.go()
m.internal()
m.reflexive()
m.condition()
print(event_log) # >>> ['go', 'internal', 'reflexive', 'condition']
print([state.name for state in state_log]) # >>> ['B', 'B', 'B', 'B']
传递给 finalize_event
的回调将始终被调用,即使转换引发异常。通过设置 send_event=True
,所有回调都将收到一个 EvenData
对象,其中包含事件、状态和转换信息,如果出现问题,还会收到一个错误对象。这是我必须更改条件 lambda 表达式的方法。 send_event=True
时,所有回调都需要能够处理 EventData
对象。
有关 finalize_event
和回调执行顺序的更多信息可以在文档的 this section 中找到。
如何记录无效事件?
finalize_event
仅针对有效事件调用,这意味着该事件必须存在并且在当前源状态下也必须有效。如果要处理所有事件,Machine
需要扩展:
from transitions import Machine
log = []
class LogMachine(Machine):
def _get_trigger(self, model, trigger_name, *args, **kwargs):
res = super(LogMachine, self)._get_trigger(model, trigger_name, *args, **kwargs)
log.append((trigger_name, model.state))
return res
# ...
m = LogMachine(states=..., ignore_invalid_triggers=True)
assert m.trigger("go") # valid
assert not m.trigger("go") # invalid
assert not m.trigger("gooo") # invalid
print(log) # >>> [('go', 'B'), ('go', 'B'), ('gooo', 'B')]
每个模型都装饰有一个 trigger
方法,该方法是 Machine._get_trigger
的一部分,并分配了 model
参数。 Model.trigger
可用于按名称触发事件,也可用于处理不存在的事件。您还需要传递 ignore_invalid_triggers=True
以在事件无效时不引发 MachineError
。
但是,如果所有事件都应该被记录,那么 feasible/maintainable 将日志记录从 Machine
中分离出来并在处理事件的地方处理日志记录可能更 feasible/maintainable,例如:
m = Machine(..., ignore_invalid_triggers=True)
# ...
def on_event(event_name):
logging.debug(f"Received event {event_name}") # or log_event.append(event_name)
m.trigger(event_name)
logging.debug(f"Machine state is {m.state}") # or log_state.append(m.state)
我最近在 Python 中发现了一个轻量级的、面向对象的状态机实现,称为转换 (https://github.com/pytransitions/transitions)。所以我正在尝试使用这些状态机,尤其是 HierarchicalGraphMachine。我想要的一个很好的功能是存储访问状态的历史,即使机器没有移动(保持在相同状态)。
而且从我从示例中看到的情况来看,我们实际上无法以简单的方式做到这一点,因为当机器状态未更改时 before_state_change
和 after_state_change
未被调用。所以在这种情况下我们不能延长我们的历史。为了解决这个问题,我最终创建了一个 trigger_wrapper 函数:
def trigger_wrapper(self, trigger_name):
previous_state = self.state
result = None
try:
result = self.trigger(trigger_name)
except AttributeError as attribute_err:
print('Invalid trigger name: {}'.format(attribute_err))
except MachineError as machine_err:
print('Valid trigger name but not reachable: {}'.format(machine_err))
except Exception as err:
print('Cannot make transition with unknown error: {}'.format(err))
if result is False:
print('Trigger name reachable but condition(s) was not fulfilled')
....
current_state = self.state
# update history
.....
return result
然后,我们调用 trigger_wrapper 而不是触发器:
before: machine.trigger('drink')
now: machine.trigger_wrapper('drink').
除此之外,通过在初始化Machine
时设置ignore_invalid_triggers = False
并使用这个trigger_wrapper
函数,我们现在可以通过缓存知道机器不能移动的原因例外情况。
有没有更好的方案来保持跟踪访问状态?我认为另一种方法是覆盖触发函数,但由于 NestedState
.
编辑 1(遵循@aleneum 的建议)
感谢您的回复以及一个有趣的例子!!!
以使用finalize_event
为例。一切顺利,但这个回调函数似乎不足以捕获以下情况(我在代码中添加了 2 行):
... same setup as before
m.go()
m.internal()
m.reflexive()
m.condition()
m.go() # MachineError: "Can't trigger event go from state B!"
m.goo() # AttributeError: Do not know event named 'goo'.
>>> Expected: ['go', 'internal', 'reflexive', 'condition', 'go', 'goo']
>>> Expected: ['B', 'B', 'B', 'B', 'B', 'B']
换句话说,是否有另一个回调我们可以捕获由调用 invalid trigger
(示例中的 goo)或 valid trigger but not reachable from the current state
(从状态 B 调用 go())引起的异常?
再次感谢您的帮助。
正如您已经提到的,before_state_change
和 after_state_change
仅在发生转换时调用。这并不一定意味着状态更改,因为内部和自反转换也会触发这些回调:
from transitions import Machine
def test():
print("triggered")
m = Machine(states=['A', 'B'], transitions=[
['go', 'A', 'B'],
dict(trigger='internal', source='B', dest=None),
dict(trigger='reflexive', source='B', dest='='),
dict(trigger='condition', source='B', dest='A', conditions=lambda: False)
], after_state_change=test, initial='A')
m.go() # >>> triggered
m.internal() # >>> triggered
m.reflexive() # >>> triggered
m.condition() # no output
这里唯一不触发 after_state_change
的事件是 m.condition
,因为转换被(未满足的)条件停止。
因此,当您的目标是跟踪实际进行的转换时,after_state_change
是正确的选择。如果你想每 trigger/event 记录一次,你可以通过 finalize_event
:
'machine.finalize_event' - callbacks will be executed even if no transition took place or an exception has been raised
from transitions import Machine
event_log = []
state_log = []
def log_trigger(event_data):
event_log.append(event_data.event.name)
state_log.append(event_data.state)
m = Machine(states=['A', 'B'], transitions=[
['go', 'A', 'B'],
dict(trigger='internal', source='B', dest=None),
dict(trigger='reflexive', source='B', dest='='),
dict(trigger='condition', source='B', dest='A', conditions=lambda event_data: False)
], finalize_event=log_trigger, initial='A', send_event=True)
m.go()
m.internal()
m.reflexive()
m.condition()
print(event_log) # >>> ['go', 'internal', 'reflexive', 'condition']
print([state.name for state in state_log]) # >>> ['B', 'B', 'B', 'B']
传递给 finalize_event
的回调将始终被调用,即使转换引发异常。通过设置 send_event=True
,所有回调都将收到一个 EvenData
对象,其中包含事件、状态和转换信息,如果出现问题,还会收到一个错误对象。这是我必须更改条件 lambda 表达式的方法。 send_event=True
时,所有回调都需要能够处理 EventData
对象。
有关 finalize_event
和回调执行顺序的更多信息可以在文档的 this section 中找到。
如何记录无效事件?
finalize_event
仅针对有效事件调用,这意味着该事件必须存在并且在当前源状态下也必须有效。如果要处理所有事件,Machine
需要扩展:
from transitions import Machine
log = []
class LogMachine(Machine):
def _get_trigger(self, model, trigger_name, *args, **kwargs):
res = super(LogMachine, self)._get_trigger(model, trigger_name, *args, **kwargs)
log.append((trigger_name, model.state))
return res
# ...
m = LogMachine(states=..., ignore_invalid_triggers=True)
assert m.trigger("go") # valid
assert not m.trigger("go") # invalid
assert not m.trigger("gooo") # invalid
print(log) # >>> [('go', 'B'), ('go', 'B'), ('gooo', 'B')]
每个模型都装饰有一个 trigger
方法,该方法是 Machine._get_trigger
的一部分,并分配了 model
参数。 Model.trigger
可用于按名称触发事件,也可用于处理不存在的事件。您还需要传递 ignore_invalid_triggers=True
以在事件无效时不引发 MachineError
。
但是,如果所有事件都应该被记录,那么 feasible/maintainable 将日志记录从 Machine
中分离出来并在处理事件的地方处理日志记录可能更 feasible/maintainable,例如:
m = Machine(..., ignore_invalid_triggers=True)
# ...
def on_event(event_name):
logging.debug(f"Received event {event_name}") # or log_event.append(event_name)
m.trigger(event_name)
logging.debug(f"Machine state is {m.state}") # or log_state.append(m.state)