pytransitions/transitions: 有没有更好的方法来存储访问状态的历史?

pytransitions/transitions: Is there any better way to store the history of visited state?

我最近在 Python 中发现了一个轻量级的、面向对象的状态机实现,称为转换 (https://github.com/pytransitions/transitions)。所以我正在尝试使用这些状态机,尤其是 HierarchicalGraphMachine。我想要的一个很好的功能是存储访问状态的历史,即使机器没有移动(保持在相同状态)。

而且从我从示例中看到的情况来看,我们实际上无法以简单的方式做到这一点,因为当机器状态未更改时 before_state_changeafter_state_change 未被调用。所以在这种情况下我们不能延长我们的历史。为了解决这个问题,我最终创建了一个 trigger_wrapper 函数:

def trigger_wrapper(self, trigger_name):
        
        previous_state = self.state

        result = None
        try:
            result = self.trigger(trigger_name)
        except AttributeError as attribute_err:
            print('Invalid trigger name: {}'.format(attribute_err))
        except MachineError as machine_err:
            print('Valid trigger name but not reachable: {}'.format(machine_err))
        except Exception as err:
            print('Cannot make transition with unknown error: {}'.format(err))
        
        if result is False:
            print('Trigger name reachable but condition(s) was not fulfilled')
            ....

        current_state = self.state

        # update history
        ..... 

        return result

然后,我们调用 trigger_wrapper 而不是触发器:

before: machine.trigger('drink')
now:    machine.trigger_wrapper('drink').

除此之外,通过在初始化Machine时设置ignore_invalid_triggers = False并使用这个trigger_wrapper函数,我们现在可以通过缓存知道机器不能移动的原因例外情况。

有没有更好的方案来保持跟踪访问状态?我认为另一种方法是覆盖触发函数,但由于 NestedState.

看起来很复杂

编辑 1(遵循@aleneum 的建议)

感谢您的回复以及一个有趣的例子!!!

以使用finalize_event为例。一切顺利,但这个回调函数似乎不足以捕获以下情况(我在代码中添加了 2 行):

... same setup as before
m.go()
m.internal()
m.reflexive()
m.condition()
m.go()    # MachineError: "Can't trigger event go from state B!"
m.goo()   # AttributeError: Do not know event named 'goo'.

>>> Expected: ['go', 'internal', 'reflexive', 'condition', 'go', 'goo'] 
>>> Expected: ['B', 'B', 'B', 'B', 'B', 'B']

换句话说,是否有另一个回调我们可以捕获由调用 invalid trigger(示例中的 goo)或 valid trigger but not reachable from the current state(从状态 B 调用 go())引起的异常?

再次感谢您的帮助。

正如您已经提到的,before_state_changeafter_state_change 仅在发生转换时调用。这并不一定意味着状态更改,因为内部和自反转换也会触发这些回调:

from transitions import Machine


def test():
    print("triggered")


m = Machine(states=['A', 'B'], transitions=[
    ['go', 'A', 'B'],
    dict(trigger='internal', source='B', dest=None),
    dict(trigger='reflexive', source='B', dest='='),
    dict(trigger='condition', source='B', dest='A', conditions=lambda: False)
], after_state_change=test, initial='A')


m.go()  # >>>  triggered
m.internal()  # >>> triggered
m.reflexive()  # >>> triggered
m.condition()  # no output

这里唯一不触发 after_state_change 的事件是 m.condition,因为转换被(未满足的)条件停止。

因此,当您的目标是跟踪实际进行的转换时,after_state_change 是正确的选择。如果你想每 trigger/event 记录一次,你可以通过 finalize_event:

'machine.finalize_event' - callbacks will be executed even if no transition took place or an exception has been raised

from transitions import Machine


event_log = []
state_log = []


def log_trigger(event_data):
    event_log.append(event_data.event.name)
    state_log.append(event_data.state)


m = Machine(states=['A', 'B'], transitions=[
    ['go', 'A', 'B'],
    dict(trigger='internal', source='B', dest=None),
    dict(trigger='reflexive', source='B', dest='='),
    dict(trigger='condition', source='B', dest='A', conditions=lambda event_data: False)
], finalize_event=log_trigger, initial='A', send_event=True)


m.go()
m.internal()
m.reflexive()
m.condition()

print(event_log)  # >>> ['go', 'internal', 'reflexive', 'condition']
print([state.name for state in state_log])  # >>> ['B', 'B', 'B', 'B']

传递给 finalize_event 的回调将始终被调用,即使转换引发异常。通过设置 send_event=True,所有回调都将收到一个 EvenData 对象,其中包含事件、状态和转换信息,如果出现问题,还会收到一个错误对象。这是我必须更改条件 lambda 表达式的方法。 send_event=True 时,所有回调都需要能够处理 EventData 对象。

有关 finalize_event 和回调执行顺序的更多信息可以在文档的 this section 中找到。

如何记录无效事件?

finalize_event 仅针对有效事件调用,这意味着该事件必须存在并且在当前源状态下也必须有效。如果要处理所有事件,Machine 需要扩展:

from transitions import Machine

log = []


class LogMachine(Machine):

    def _get_trigger(self, model, trigger_name, *args, **kwargs):
        res = super(LogMachine, self)._get_trigger(model, trigger_name, *args, **kwargs)
        log.append((trigger_name, model.state))
        return res

# ...
m = LogMachine(states=..., ignore_invalid_triggers=True)
assert m.trigger("go")  # valid
assert not m.trigger("go")  # invalid
assert not m.trigger("gooo")  # invalid
print(log)  # >>> [('go', 'B'), ('go', 'B'), ('gooo', 'B')]

每个模型都装饰有一个 trigger 方法,该方法是 Machine._get_trigger 的一部分,并分配了 model 参数。 Model.trigger 可用于按名称触发事件,也可用于处理不存在的事件。您还需要传递 ignore_invalid_triggers=True 以在事件无效时不引发 MachineError

但是,如果所有事件都应该被记录,那么 feasible/maintainable 将日志记录从 Machine 中分离出来并在处理事件的地方处理日志记录可能更 feasible/maintainable,例如:

m = Machine(..., ignore_invalid_triggers=True)
# ...
def on_event(event_name):
   logging.debug(f"Received event {event_name}")  # or log_event.append(event_name)
   m.trigger(event_name)
   logging.debug(f"Machine state is {m.state}")  # or log_state.append(m.state)