无法获得 运行 https://github.com/tyarkoni/transitions/issues/198 中显示的超时机制。
Not able to get running the timeout mechanism shown in https://github.com/tyarkoni/transitions/issues/198.
需要你的帮助。
正在尝试为此处提到的我的状态机设置超时功能 transitions library。在机器初始化时继续接收:
TypeError: init() got an unexpected keyword argument 'parent'
虽然传播了所有 cunstructor 参数。
我的代码(Python 2.7.6/IPython 5.1.0):
from transitions.extensions import HierarchicalMachine as Machine
import time
from threading import Thread
from transitions import State
class Timeout(Thread):
def __init__(self, func, timeout):
super(Timeout, self).__init__()
self.func = func
self.timeout = timeout
self.canceled = False
print 'Starting countdown'
self.start()
def run(self):
time.sleep(self.timeout)
print 'Timeout occurred'
if not self.canceled:
self.func()
class TimeoutState(State):
def __init__(self, name, timeout=None, *args, **kwargs):
self.timeout = timeout
super(TimeoutState, self).__init__(name=name, *args, **kwargs)
class TimeoutMachine(Machine):
def __init__(self, model, states, transitions, initial, title):
super(Machine, self).__init__( model=model,
states=states,
transitions=transitions,
initial=initial,
name=title,
queued=True,
auto_transitions=True,
send_event=True)
def _create_state(self, *args, **kwargs):
return TimeoutState(*args, **kwargs)
class Model():
def __init__(self):
self.timer = None
def set_timeout(self, event_data):
timeout=event_data.kwargs.pop('timeout',3)
self.timer = Timeout(self.doTimeout, timeout)
model = Model()
transitions = [{'doTimeout', 'B', 'C'}]
b = TimeoutState(name='B',timeout=5, on_enter='set_timeout')
machine = TimeoutMachine(model=model, states=['A', b, 'C'],
transitions=transitions, initial='A',
title='TestMachineWithTimeouts')
错误堆栈:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2881, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-40-747d39e2dc66>", line 1, in <module>
t.TimeoutMachine(model=model, states=['A', b, 'C'], transitions=transitions, initial='A', title='TestMachineWithTimeouts')
File "/home/sge/workspace_neon/ocpp16j_dev/_test_.py", line 61, in __init__
send_event=True)
File "build/bdist.linux-i686/egg/transitions/core.py", line 364, in __init__
self.add_states(states)
File "build/bdist.linux-i686/egg/transitions/extensions/nesting.py", line 264, in add_states
new_states = self.traverse(states, *args, **kwargs)
File "build/bdist.linux-i686/egg/transitions/extensions/nesting.py", line 183, in traverse
ignore_invalid_triggers=ignore))
File "/home/sge/workspace_neon/ocpp16j_dev/_test_.py", line 78, in _create_state
return TimeoutState(*args, **kwargs)
File "/home/sge/workspace_neon/ocpp16j_dev/_test_.py", line 46, in __init__
super(TimeoutState, self).__init__(name=name, *args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'parent'
这有什么问题吗?有什么线索吗?
谢谢!
您的示例存在(至少)三个问题:
问题:HierarchicalMachine
需要 transitions.extensions.nesting.NestedState
类型的状态
- 解决方法:只要subclass正确的状态即可。最简单的解决方法是将
from transitions import State
替换为 from transitions.extensions.nesting import NestedState as State
问题:在 TimeoutMachine.__init__
中,您使用基 class 而不是子 class 调用 super
。这实际上会调用 Machine
的基础 class 的 __init__
方法(如果有的话)。
- 解决方法:将
super(Machine, self).__init__
改为super(TimeoutMachine, self).__init__
问题:转换被定义为一个集合,但 transitions
仅支持用于此目的的数组或字典。
- 解决方案:将
transitions = [{'doTimeout', 'B', 'C'}]
更改为 transitions=[['doTimeout', 'B', 'C']]
或 transitions=[{'trigger':'doTimeout', 'source':'B', 'dest':'C'}]
还有一点:既然你sublclass你的TimeoutMachine
并使用TimeoutState
作为默认状态,你就不必手动创建状态。您可以这样定义您的状态:
states = ['A', 'C', {'name': 'B', 'timeout': 5, 'on_enter': 'set_timeout'}]
工作代码:
from transitions.extensions import HierarchicalMachine as Machine
import time
from threading import Thread
from transitions.extensions.nesting import NestedState as State
class Timeout(Thread):
def __init__(self, func, timeout):
super(Timeout, self).__init__()
self.func = func
self.timeout = timeout
self.canceled = False
print 'Starting countdown'
self.start()
def run(self):
time.sleep(self.timeout)
print 'Timeout occurred'
if not self.canceled:
self.func()
class TimeoutState(State):
def __init__(self, name, timeout=None, *args, **kwargs):
self.timeout = timeout
super(TimeoutState, self).__init__(name=name, *args, **kwargs)
class TimeoutMachine(Machine):
def __init__(self, model, states, transitions, initial, title):
super(TimeoutMachine, self).__init__(model=model,
states=states,
transitions=transitions,
initial=initial,
name=title,
queued=True,
auto_transitions=True,
send_event=True)
def _create_state(self, *args, **kwargs):
return TimeoutState(*args, **kwargs)
class Model:
def __init__(self):
self.timer = None
def set_timeout(self, event_data):
timeout = event_data.kwargs.pop('timeout', 3)
self.timer = Timeout(self.doTimeout, timeout)
model = Model()
transitions = [{'trigger': 'doTimeout', 'source':'B', 'dest':'C'}]
states = ['A', 'C', {'name': 'B', 'timeout': 5, 'on_enter': 'set_timeout'}]
machine = TimeoutMachine(model=model, states=states,
transitions=transitions, initial='A',
title='TestMachineWithTimeouts')
model.to_B()
需要你的帮助。
正在尝试为此处提到的我的状态机设置超时功能 transitions library。在机器初始化时继续接收:
TypeError: init() got an unexpected keyword argument 'parent'
虽然传播了所有 cunstructor 参数。
我的代码(Python 2.7.6/IPython 5.1.0):
from transitions.extensions import HierarchicalMachine as Machine
import time
from threading import Thread
from transitions import State
class Timeout(Thread):
def __init__(self, func, timeout):
super(Timeout, self).__init__()
self.func = func
self.timeout = timeout
self.canceled = False
print 'Starting countdown'
self.start()
def run(self):
time.sleep(self.timeout)
print 'Timeout occurred'
if not self.canceled:
self.func()
class TimeoutState(State):
def __init__(self, name, timeout=None, *args, **kwargs):
self.timeout = timeout
super(TimeoutState, self).__init__(name=name, *args, **kwargs)
class TimeoutMachine(Machine):
def __init__(self, model, states, transitions, initial, title):
super(Machine, self).__init__( model=model,
states=states,
transitions=transitions,
initial=initial,
name=title,
queued=True,
auto_transitions=True,
send_event=True)
def _create_state(self, *args, **kwargs):
return TimeoutState(*args, **kwargs)
class Model():
def __init__(self):
self.timer = None
def set_timeout(self, event_data):
timeout=event_data.kwargs.pop('timeout',3)
self.timer = Timeout(self.doTimeout, timeout)
model = Model()
transitions = [{'doTimeout', 'B', 'C'}]
b = TimeoutState(name='B',timeout=5, on_enter='set_timeout')
machine = TimeoutMachine(model=model, states=['A', b, 'C'],
transitions=transitions, initial='A',
title='TestMachineWithTimeouts')
错误堆栈:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2881, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-40-747d39e2dc66>", line 1, in <module>
t.TimeoutMachine(model=model, states=['A', b, 'C'], transitions=transitions, initial='A', title='TestMachineWithTimeouts')
File "/home/sge/workspace_neon/ocpp16j_dev/_test_.py", line 61, in __init__
send_event=True)
File "build/bdist.linux-i686/egg/transitions/core.py", line 364, in __init__
self.add_states(states)
File "build/bdist.linux-i686/egg/transitions/extensions/nesting.py", line 264, in add_states
new_states = self.traverse(states, *args, **kwargs)
File "build/bdist.linux-i686/egg/transitions/extensions/nesting.py", line 183, in traverse
ignore_invalid_triggers=ignore))
File "/home/sge/workspace_neon/ocpp16j_dev/_test_.py", line 78, in _create_state
return TimeoutState(*args, **kwargs)
File "/home/sge/workspace_neon/ocpp16j_dev/_test_.py", line 46, in __init__
super(TimeoutState, self).__init__(name=name, *args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'parent'
这有什么问题吗?有什么线索吗?
谢谢!
您的示例存在(至少)三个问题:
问题:
类型的状态HierarchicalMachine
需要transitions.extensions.nesting.NestedState
- 解决方法:只要subclass正确的状态即可。最简单的解决方法是将
from transitions import State
替换为from transitions.extensions.nesting import NestedState as State
- 解决方法:只要subclass正确的状态即可。最简单的解决方法是将
问题:在
TimeoutMachine.__init__
中,您使用基 class 而不是子 class 调用super
。这实际上会调用Machine
的基础 class 的__init__
方法(如果有的话)。- 解决方法:将
super(Machine, self).__init__
改为super(TimeoutMachine, self).__init__
- 解决方法:将
问题:转换被定义为一个集合,但
transitions
仅支持用于此目的的数组或字典。- 解决方案:将
transitions = [{'doTimeout', 'B', 'C'}]
更改为transitions=[['doTimeout', 'B', 'C']]
或transitions=[{'trigger':'doTimeout', 'source':'B', 'dest':'C'}]
- 解决方案:将
还有一点:既然你sublclass你的TimeoutMachine
并使用TimeoutState
作为默认状态,你就不必手动创建状态。您可以这样定义您的状态:
states = ['A', 'C', {'name': 'B', 'timeout': 5, 'on_enter': 'set_timeout'}]
工作代码:
from transitions.extensions import HierarchicalMachine as Machine
import time
from threading import Thread
from transitions.extensions.nesting import NestedState as State
class Timeout(Thread):
def __init__(self, func, timeout):
super(Timeout, self).__init__()
self.func = func
self.timeout = timeout
self.canceled = False
print 'Starting countdown'
self.start()
def run(self):
time.sleep(self.timeout)
print 'Timeout occurred'
if not self.canceled:
self.func()
class TimeoutState(State):
def __init__(self, name, timeout=None, *args, **kwargs):
self.timeout = timeout
super(TimeoutState, self).__init__(name=name, *args, **kwargs)
class TimeoutMachine(Machine):
def __init__(self, model, states, transitions, initial, title):
super(TimeoutMachine, self).__init__(model=model,
states=states,
transitions=transitions,
initial=initial,
name=title,
queued=True,
auto_transitions=True,
send_event=True)
def _create_state(self, *args, **kwargs):
return TimeoutState(*args, **kwargs)
class Model:
def __init__(self):
self.timer = None
def set_timeout(self, event_data):
timeout = event_data.kwargs.pop('timeout', 3)
self.timer = Timeout(self.doTimeout, timeout)
model = Model()
transitions = [{'trigger': 'doTimeout', 'source':'B', 'dest':'C'}]
states = ['A', 'C', {'name': 'B', 'timeout': 5, 'on_enter': 'set_timeout'}]
machine = TimeoutMachine(model=model, states=states,
transitions=transitions, initial='A',
title='TestMachineWithTimeouts')
model.to_B()