python 转换库 - 在从单一状态转换之前等待多个事件?
python transitions lib - Wait for multiple events before transition from single state possible?
post 来自 github 问题 https://github.com/pytransitions/transitions/issues/247。
我正在使用 python 的转换 FSM 库,在转换到下一个状态之前我需要等待 3 件事发生。
State = WAIT_FOR_3_THINGS
Events = thing_1_done, thing_2_done, thing_3_done
thing_*_events 可以按任何顺序出现,所以我希望避免出现:
thing_1_and_2_done
、thing_1_and_3_done
、thing_2_and_3_done
州
我可以跟踪事件 1、2 和 3 并以
前进
conditions=[thing_1_and_2_done, thing_1_and_3_done, thing_2_and_3_done]
但我不确定在哪里最好聚合这些事件发生。
是否有更多的 skookum(好的,pythonic)方法来做到这一点?
最终使用集合来定义我想在转换到下一个状态之前积累的事件列表。
例如:
deps_required = {'got_thing_1', 'got_thing_2', 'got_thing_3'}
将实例变量添加到 FSM 实例以跟踪在进入 'idle' 状态时清除的这个:
self.run_deps_set = set()
然后,聚合并检查条件子句中的事件子类型:
def if_run_deps_ready(self, event):
self.run_deps_set.add(event.args[0])
return self.run_deps_set.issuperset(deps_required)
感觉不是超级干净,但我不讨厌它(太多)。
不知道这是否是一个很好的,甚至是好的解决方案,但您可以考虑将每个事物作为一个函数进行线程化或多处理,并在每个事物的末尾包含一个 var,它在完成时翻转为 true。
然后一个简单的 if a-complete、b-complete、c-complete bools-are-true 检查应该可以让你到达你想去的地方。
从务实的角度来看,您的 是过渡的方式,恕我直言。对于这样的问题,并发状态机可能值得一看。
对于这个特殊情况,这肯定是过头了,但我想借此机会展示转换如何处理并发。
更准确地说,我们可以利用它处理多个模型的能力来实现并发。
在下面的示例中,我扩展了 Machine
a) 一种在所有模型上触发事件的方法和 b) 一种初始化工作模型并将实际模型存储在内部变量中的方法(虽然这不是一个合适的堆栈).
使用 ignore_invalid_triggers
,每个工作人员都等待相应的事件,并在完成后从 ParallelMachine
模型列表中删除自己。当模型列表为空时,ParallelMachine
再次恢复实际模型状态。
from transitions import Machine
class WorkerModel:
def __init__(self, machine):
self.machine = machine
# I show myself out when I am done
def on_enter_Done(self):
self.machine.remove_model(self)
class ParallelMachine(Machine):
def __init__(self, *args, **kwargs):
self._pushed_models = []
super(ParallelMachine, self).__init__(*args, **kwargs)
# save actual models and initialise worker tasks
def add_worker(self, tasks):
self._pushed_models = self.models
self.models = []
for t in tasks:
self.add_model(WorkerModel(self), initial=t)
# trigger an event on ALL models (either workers or actual models)
def trigger(self, trigger, *args, **kwargs):
for m in self.models:
getattr(m, trigger)(*args, **kwargs)
if not self.models:
self.models = self._pushed_models
self._pushed_models = []
for m in self.models:
getattr(m, trigger)(*args, **kwargs)
class Model:
def __init__(self):
# the state list contains model states as well as worker states
states = ['Idle', 'Processing', 'Done', 'need_1', 'need_2', 'need_3']
# potentially got_1/2/3 all can trigger a transition to 'Done'
# but the model will only be triggered when the worker list
# has been emptied before
transitions = [['process', 'Idle', 'Processing'],
['got_1', ['Processing', 'need_1'], 'Done'],
['got_2', ['Processing', 'need_2'], 'Done'],
['got_3', ['Processing', 'need_3'], 'Done']]
# assigning machine to a model attribute prevents the need
# to pass it around
self.machine = ParallelMachine(model=self, states=states,
transitions=transitions, initial='Idle',
ignore_invalid_triggers=True)
def on_enter_Processing(self):
self.machine.add_worker(['need_1', 'need_2', 'need_3'])
def on_enter_Done(self):
print("I am done")
model = Model()
machine = model.machine
assert model.state == 'Idle'
# we use ParallelMachine.trigger instead of model.<event_name>
machine.trigger('process')
assert model.state == 'Processing'
machine.trigger('got_3')
machine.trigger('got_1')
# duplicated events do not matter
machine.trigger('got_3')
assert model.state == 'Processing'
machine.trigger('got_2')
# worker stack was empty and
assert model.state == 'Done'
这样全局状态可以是 "Processing(need_1/need_2/done)" 或 "Processing(done/need_2/done)" 而无需显式建模所有变体。
并发在 transitions
待办事项列表中,但需要进行一些更改。例如,回调应该在所有模型中可用(例如,从 Model
中删除 on_enter_Done
将引发异常)。
post 来自 github 问题 https://github.com/pytransitions/transitions/issues/247。
我正在使用 python 的转换 FSM 库,在转换到下一个状态之前我需要等待 3 件事发生。
State = WAIT_FOR_3_THINGS
Events = thing_1_done, thing_2_done, thing_3_done
thing_*_events 可以按任何顺序出现,所以我希望避免出现:
thing_1_and_2_done
、thing_1_and_3_done
、thing_2_and_3_done
州
我可以跟踪事件 1、2 和 3 并以
前进conditions=[thing_1_and_2_done, thing_1_and_3_done, thing_2_and_3_done]
但我不确定在哪里最好聚合这些事件发生。
是否有更多的 skookum(好的,pythonic)方法来做到这一点?
最终使用集合来定义我想在转换到下一个状态之前积累的事件列表。
例如:
deps_required = {'got_thing_1', 'got_thing_2', 'got_thing_3'}
将实例变量添加到 FSM 实例以跟踪在进入 'idle' 状态时清除的这个:
self.run_deps_set = set()
然后,聚合并检查条件子句中的事件子类型:
def if_run_deps_ready(self, event):
self.run_deps_set.add(event.args[0])
return self.run_deps_set.issuperset(deps_required)
感觉不是超级干净,但我不讨厌它(太多)。
不知道这是否是一个很好的,甚至是好的解决方案,但您可以考虑将每个事物作为一个函数进行线程化或多处理,并在每个事物的末尾包含一个 var,它在完成时翻转为 true。
然后一个简单的 if a-complete、b-complete、c-complete bools-are-true 检查应该可以让你到达你想去的地方。
从务实的角度来看,您的
对于这个特殊情况,这肯定是过头了,但我想借此机会展示转换如何处理并发。
更准确地说,我们可以利用它处理多个模型的能力来实现并发。
在下面的示例中,我扩展了 Machine
a) 一种在所有模型上触发事件的方法和 b) 一种初始化工作模型并将实际模型存储在内部变量中的方法(虽然这不是一个合适的堆栈).
使用 ignore_invalid_triggers
,每个工作人员都等待相应的事件,并在完成后从 ParallelMachine
模型列表中删除自己。当模型列表为空时,ParallelMachine
再次恢复实际模型状态。
from transitions import Machine
class WorkerModel:
def __init__(self, machine):
self.machine = machine
# I show myself out when I am done
def on_enter_Done(self):
self.machine.remove_model(self)
class ParallelMachine(Machine):
def __init__(self, *args, **kwargs):
self._pushed_models = []
super(ParallelMachine, self).__init__(*args, **kwargs)
# save actual models and initialise worker tasks
def add_worker(self, tasks):
self._pushed_models = self.models
self.models = []
for t in tasks:
self.add_model(WorkerModel(self), initial=t)
# trigger an event on ALL models (either workers or actual models)
def trigger(self, trigger, *args, **kwargs):
for m in self.models:
getattr(m, trigger)(*args, **kwargs)
if not self.models:
self.models = self._pushed_models
self._pushed_models = []
for m in self.models:
getattr(m, trigger)(*args, **kwargs)
class Model:
def __init__(self):
# the state list contains model states as well as worker states
states = ['Idle', 'Processing', 'Done', 'need_1', 'need_2', 'need_3']
# potentially got_1/2/3 all can trigger a transition to 'Done'
# but the model will only be triggered when the worker list
# has been emptied before
transitions = [['process', 'Idle', 'Processing'],
['got_1', ['Processing', 'need_1'], 'Done'],
['got_2', ['Processing', 'need_2'], 'Done'],
['got_3', ['Processing', 'need_3'], 'Done']]
# assigning machine to a model attribute prevents the need
# to pass it around
self.machine = ParallelMachine(model=self, states=states,
transitions=transitions, initial='Idle',
ignore_invalid_triggers=True)
def on_enter_Processing(self):
self.machine.add_worker(['need_1', 'need_2', 'need_3'])
def on_enter_Done(self):
print("I am done")
model = Model()
machine = model.machine
assert model.state == 'Idle'
# we use ParallelMachine.trigger instead of model.<event_name>
machine.trigger('process')
assert model.state == 'Processing'
machine.trigger('got_3')
machine.trigger('got_1')
# duplicated events do not matter
machine.trigger('got_3')
assert model.state == 'Processing'
machine.trigger('got_2')
# worker stack was empty and
assert model.state == 'Done'
这样全局状态可以是 "Processing(need_1/need_2/done)" 或 "Processing(done/need_2/done)" 而无需显式建模所有变体。
并发在 transitions
待办事项列表中,但需要进行一些更改。例如,回调应该在所有模型中可用(例如,从 Model
中删除 on_enter_Done
将引发异常)。