更深层次的嵌套转换
Deeper nested transitions
我正在尝试使用具有三个嵌套级别的分层机器,称为 main -> nested -> deeper
。我希望状态机一个接一个地执行,然后将状态重新映射回第一台机器。所以我希望最终状态是 done
,但它是 nested_deeper_working
,所以显然我遗漏了一些东西。
这里的解决方法是使用 queued=False
,然后它会按预期工作。但缺点是调用堆栈真的很长,出现错误时的回溯也很长。
很抱歉,我无法缩短示例的长度。在现实生活中,我使用 MainMachine
作为整体生产控制,它启动较小的机器来擦除、闪存、校准或测试设备。这些由 NestedMachine
表示。这些机器内部是使用的最小机器,例如。对于硬重置,一个测试序列等等。这是本例中的 DeeperMachine
。
pytransitions 0.8.10
python 3.7.3
GenericMachine
class只是一个抽象class。这里我定义了默认状态 initial
和 done
以及基本配置。
from transitions.extensions import HierarchicalMachine
class GenericMachine(HierarchicalMachine):
def __init__(self, states, transitions, model=None):
generic_states = [
{"name": "initial", "on_enter": self.entry_initial},
{"name": "done", "on_enter": self.entry_done},
]
states += generic_states
super().__init__(
states=states,
transitions=transitions,
model=model,
send_event=True,
queued=True,
)
def entry_initial(self, event_data):
raise NotImplementedError
def entry_done(self, event_data):
raise NotImplementedError
MainMachine
是层次结构中最高的机器,它启动 NestedMachine
。预计所有嵌套机器执行完毕后,执行done
状态
class MainMachine(GenericMachine):
def __init__(self):
nested = NestedMachine()
remap = {"done": "done"}
states = [
{"name": "nested", "children": nested, "remap": remap},
]
transitions = [
["go", "initial", "nested"],
]
super().__init__(states, transitions, model=self)
def entry_done(self, event_data):
print("job finished")
NestedMachine
作为第二层嵌套。它启动 DeeperMachine
并重新映射 done
状态。
class NestedMachine(GenericMachine):
def __init__(self):
deeper = DeeperMachine()
remap = {"done": "done"}
states = [
{"name": "deeper", "children": deeper, "remap": remap},
]
transitions = [
["go", "initial", "deeper"],
]
super().__init__(states, transitions)
def entry_initial(self, event_data):
event_data.model.go()
第三层嵌套由DeeperMachine
实现。工作完成后它触发go
事件转换到done
状态并通过NestedMachine
跳回MainMachine
class DeeperMachine(GenericMachine):
def __init__(self):
states = [
{"name": "working", "on_enter": self.entry_working},
]
transitions = [
["go", "initial", "working"],
["go", "working", "done"],
]
super().__init__(states, transitions, model=self)
def entry_initial(self, event_data):
event_data.model.go()
def entry_working(self, event_data):
event_data.model.go()
测试实例化 MainMachine
并触发第一个事件。预计嵌套机器将被调用,并且在作业完成后,它将通过 done
状态重新映射,回到 MainMachine
.
import logging as log
def main():
log.basicConfig(level=log.DEBUG)
log.getLogger("transitions").setLevel(log.INFO)
machine = MainMachine()
machine.go()
assert machine.state == "done"
if __name__ == "__main__":
main()
确认为错误
https://github.com/pytransitions/transitions/issues/554
已在 dev-0.9 中解决,示例和生产代码对我来说效果很好。
我正在尝试使用具有三个嵌套级别的分层机器,称为 main -> nested -> deeper
。我希望状态机一个接一个地执行,然后将状态重新映射回第一台机器。所以我希望最终状态是 done
,但它是 nested_deeper_working
,所以显然我遗漏了一些东西。
这里的解决方法是使用 queued=False
,然后它会按预期工作。但缺点是调用堆栈真的很长,出现错误时的回溯也很长。
很抱歉,我无法缩短示例的长度。在现实生活中,我使用 MainMachine
作为整体生产控制,它启动较小的机器来擦除、闪存、校准或测试设备。这些由 NestedMachine
表示。这些机器内部是使用的最小机器,例如。对于硬重置,一个测试序列等等。这是本例中的 DeeperMachine
。
pytransitions 0.8.10
python 3.7.3
GenericMachine
class只是一个抽象class。这里我定义了默认状态 initial
和 done
以及基本配置。
from transitions.extensions import HierarchicalMachine
class GenericMachine(HierarchicalMachine):
def __init__(self, states, transitions, model=None):
generic_states = [
{"name": "initial", "on_enter": self.entry_initial},
{"name": "done", "on_enter": self.entry_done},
]
states += generic_states
super().__init__(
states=states,
transitions=transitions,
model=model,
send_event=True,
queued=True,
)
def entry_initial(self, event_data):
raise NotImplementedError
def entry_done(self, event_data):
raise NotImplementedError
MainMachine
是层次结构中最高的机器,它启动 NestedMachine
。预计所有嵌套机器执行完毕后,执行done
状态
class MainMachine(GenericMachine):
def __init__(self):
nested = NestedMachine()
remap = {"done": "done"}
states = [
{"name": "nested", "children": nested, "remap": remap},
]
transitions = [
["go", "initial", "nested"],
]
super().__init__(states, transitions, model=self)
def entry_done(self, event_data):
print("job finished")
NestedMachine
作为第二层嵌套。它启动 DeeperMachine
并重新映射 done
状态。
class NestedMachine(GenericMachine):
def __init__(self):
deeper = DeeperMachine()
remap = {"done": "done"}
states = [
{"name": "deeper", "children": deeper, "remap": remap},
]
transitions = [
["go", "initial", "deeper"],
]
super().__init__(states, transitions)
def entry_initial(self, event_data):
event_data.model.go()
第三层嵌套由DeeperMachine
实现。工作完成后它触发go
事件转换到done
状态并通过NestedMachine
跳回MainMachine
class DeeperMachine(GenericMachine):
def __init__(self):
states = [
{"name": "working", "on_enter": self.entry_working},
]
transitions = [
["go", "initial", "working"],
["go", "working", "done"],
]
super().__init__(states, transitions, model=self)
def entry_initial(self, event_data):
event_data.model.go()
def entry_working(self, event_data):
event_data.model.go()
测试实例化 MainMachine
并触发第一个事件。预计嵌套机器将被调用,并且在作业完成后,它将通过 done
状态重新映射,回到 MainMachine
.
import logging as log
def main():
log.basicConfig(level=log.DEBUG)
log.getLogger("transitions").setLevel(log.INFO)
machine = MainMachine()
machine.go()
assert machine.state == "done"
if __name__ == "__main__":
main()
确认为错误
https://github.com/pytransitions/transitions/issues/554
已在 dev-0.9 中解决,示例和生产代码对我来说效果很好。