pytransitions,超时,内存中,持久化到数据库
pytransitions, timeout, in-memory, persisting to database
关于我目前在我的一个项目中使用的 pyTransitions 包的问题。
我一开始就在评估不同的软件包时进行了测试,其中包括超时功能,我知道我会在以后的某个地方需要它。
然后我逐渐选择使用 sqlalchemy 将我所有的有限状态机(实际上只是一个 id 和状态)保存到磁盘,并且只在需要触发转换时重新加载它们 - 这非常有效。
不幸的是,现在又需要超时处理,甚至在尝试将其集成到我的代码中之前,我很确定这对我来说不起作用。
我想我说的很明显:为了在一组(可能更大的)fsms 上正确处理超时,它们必须作为活动对象存在于内存中,而不仅仅是从数据库加载?
这是您已经遇到过的用例吗?是否有某种方法也可以访问此超时计数器,以便随时通过适当的修复来持久化和重新加载它,以便允许超时机制重新站起来,即使该对象在 RAM 中不存在实际超时?
如果没有简单的内置替代方案,我想我会在 RAM 中创建一个常驻对象池,定期保留它们并在我的应用程序出现故障时重新加载它们? (我的具体场景使用了 sqlalchemy,但我想这同样适用于 pickle)
提前为任何想法或建议欢呼和感谢
乔尔
目前还没有恢复超时的内置功能。但是,有一些方法可以将机器从(字典)配置转换为(字典)配置。扩展名为 MarkupMachine
并在 FAQ notebook.
中提到
我们需要的是一个超时状态 class,它存储有关何时应触发并可根据此信息恢复的信息。我们还需要 MarkupMachine
来处理这些自定义状态信息。 MarkupMachine._convert_models
将模型及其当前状态转换为字典,MarkupMachine._add_markup_model
将获取字典以再次实例化模型。因此,我们需要扩展这两种方法。
我会走捷径以保持代码简短并专注于概念。虽然没有强制性的假设。我会假设 a) 你可以处理配置,因为你可以调整它们以便从你的数据库中存储和检索。此外,我将假设 b) 你的机器是你的有状态模型,c) 你使用默认的 model_attribute
'state',d) 你不使用 nested/hierarchical 机器,e) 你触发事件时不要传递重要的自定义信息。最后,f) 你不介意进入恢复状态并且潜在的 on_enter_<callbacks>
将被触发,并且 g) 你不需要毫秒(分数)维度的精度。这听起来很多。但同样,这些都不是交易破坏者,只是需要更复杂的案例处理。
from transitions.extensions.markup import MarkupMachine
from transitions.extensions.states import Timeout
from transitions.core import EventData, Event
import time
from datetime import datetime
class MarkupTimeout(Timeout):
def __init__(self, *args, **kwargs):
# Timeout expects a number but MarkupMachine passes values to states as strings right now
kwargs['timeout'] = int(kwargs.get('timeout', 0))
super(MarkupTimeout, self).__init__(*args, **kwargs)
# we store trigger times in a dictionary with the model id as keys
self.timeout_at = {}
self.timeout = int(self.timeout)
def resume(self, timeout_at, event_data):
# since we want to give our MarkupMachine some time to instantiate we postulate that
# the earliest possible trigger time is in a second.
trigger_time = time.time() + 1
timeout_at = trigger_time if timeout_at < trigger_time else timeout_at
# we store the default timeout time ...
tmp = self.timeout
# ... and temporary override it with the delta of the intended trigger time and the current time
self.timeout = timeout_at - time.time()
# ... enter the state and trigger the creation of the timer
self.enter(event_data)
# restore the timeout for any future enter event
self.timeout = tmp
def enter(self, event_data):
# a timeout will only be initiated if the timeout value is greater than 0
if self.timeout > 0:
# calculate the time when the timeout will trigger (approximately) ...
timeout_time = time.time() + self.timeout
# and store it in the previously created dictionary
self.timeout_at[id(event_data.model)] = timeout_time
print(f"I should timeout at: {datetime.utcfromtimestamp(timeout_time)}")
super(MarkupTimeout, self).enter(event_data)
def exit(self, event_data):
super(MarkupTimeout, self).exit(event_data)
# remove the timeout time when the state is exited
self.timeout_at[id(event_data.model)] = None
class DBMachine(MarkupMachine):
# DBMachine will use this class when states are created
state_cls = MarkupTimeout
# we customize our model definition and add 'timeout_at' to it
# usually MarkupMachine would iterate over all models but since we assume the model is just
# the machine itself, we can skip that part
def _convert_models(self):
state = self.get_state(self.state)
timeout_at = state.timeout_at.get(id(self), None)
model_def = {'state': state.name,
'name': 'DBMachine',
'class-name': 'self',
'timeout_at': str(timeout_at) if timeout_at is not None else ''}
return [model_def]
def _add_markup_model(self, markup):
initial = markup.get('state', None)
timeout_at = markup.get('timeout_at', '')
self.add_model(self, initial)
if timeout_at:
state = self.get_state(self.state)
# as mentioned above, every configuration value is a string right now
ms = float(timeout_at)
# since we did not store event data, we need to create a temporary event with a minimal EventData object
# that can be passed to state callbacks
state.resume(ms, EventData(state=state,
event=Event(name="resume", machine=self),
machine=self,
model=self,
args=[],
kwargs={}))
# we pass a timeout only for 'pending'
states = ['init', dict(name='pending', timeout=5, on_timeout='cancel'), 'done', 'cancelled']
transitions = [
dict(trigger='request', source='init', dest='pending'),
dict(trigger='cancel', source='pending', dest='cancelled'),
dict(trigger='result', source='pending', dest='done')
]
m = DBMachine(states=states, transitions=transitions, initial='init')
# transition to 'pending' and initiate timer
m.request()
assert m.is_pending()
config = m.markup # [1]
# remove old machine
del m
# create new machine from configuration
m2 = DBMachine(markup=config)
assert m2.is_pending()
time.sleep(10)
assert m2.is_cancelled()
配置 [1] 如下所示:
{ 'after_state_change': [],
'auto_transitions': True,
'before_state_change': [],
'finalize_event': [],
'ignore_invalid_triggers': None,
'initial': 'init',
'models': [ { 'class-name': 'self',
'name': 'DBMachine',
'state': 'pending',
'timeout_at': '1617958918.6320097'}],
'prepare_event': [],
'queued': False,
'send_event': False,
'states': [ {'name': 'init'},
{'name': 'pending', 'on_timeout': ['cancel'], 'timeout': '5'},
{'name': 'done'},
{'name': 'cancelled'}],
'transitions': [ {'dest': 'pending', 'source': 'init', 'trigger': 'request'},
{ 'dest': 'cancelled',
'source': 'pending',
'trigger': 'cancel'},
{'dest': 'done', 'source': 'pending', 'trigger': 'result'}]}
我假设可以重新组织此配置以启用 SQL 查询以过滤即将发生的超时并在必要时实例化机器。 timeout_at
也可以存储为日期时间字符串而不是 unix 时间戳,如果这样可以使查询更容易的话。您也可以只存储 models
部分,而不是从配置创建 DBMachine
,而是以 'common' 方式创建它:
# reuse the states and transitions and only create the model from configuration
# 'model=None' prevents the machine from adding itself as a model too early
m2 = DBMachine(model=None, states=states, transitions=transitions, initial='init')
m2._add_markup_model(config['models'][0])
关于我目前在我的一个项目中使用的 pyTransitions 包的问题。
我一开始就在评估不同的软件包时进行了测试,其中包括超时功能,我知道我会在以后的某个地方需要它。
然后我逐渐选择使用 sqlalchemy 将我所有的有限状态机(实际上只是一个 id 和状态)保存到磁盘,并且只在需要触发转换时重新加载它们 - 这非常有效。
不幸的是,现在又需要超时处理,甚至在尝试将其集成到我的代码中之前,我很确定这对我来说不起作用。 我想我说的很明显:为了在一组(可能更大的)fsms 上正确处理超时,它们必须作为活动对象存在于内存中,而不仅仅是从数据库加载?
这是您已经遇到过的用例吗?是否有某种方法也可以访问此超时计数器,以便随时通过适当的修复来持久化和重新加载它,以便允许超时机制重新站起来,即使该对象在 RAM 中不存在实际超时?
如果没有简单的内置替代方案,我想我会在 RAM 中创建一个常驻对象池,定期保留它们并在我的应用程序出现故障时重新加载它们? (我的具体场景使用了 sqlalchemy,但我想这同样适用于 pickle)
提前为任何想法或建议欢呼和感谢 乔尔
目前还没有恢复超时的内置功能。但是,有一些方法可以将机器从(字典)配置转换为(字典)配置。扩展名为 MarkupMachine
并在 FAQ notebook.
我们需要的是一个超时状态 class,它存储有关何时应触发并可根据此信息恢复的信息。我们还需要 MarkupMachine
来处理这些自定义状态信息。 MarkupMachine._convert_models
将模型及其当前状态转换为字典,MarkupMachine._add_markup_model
将获取字典以再次实例化模型。因此,我们需要扩展这两种方法。
我会走捷径以保持代码简短并专注于概念。虽然没有强制性的假设。我会假设 a) 你可以处理配置,因为你可以调整它们以便从你的数据库中存储和检索。此外,我将假设 b) 你的机器是你的有状态模型,c) 你使用默认的 model_attribute
'state',d) 你不使用 nested/hierarchical 机器,e) 你触发事件时不要传递重要的自定义信息。最后,f) 你不介意进入恢复状态并且潜在的 on_enter_<callbacks>
将被触发,并且 g) 你不需要毫秒(分数)维度的精度。这听起来很多。但同样,这些都不是交易破坏者,只是需要更复杂的案例处理。
from transitions.extensions.markup import MarkupMachine
from transitions.extensions.states import Timeout
from transitions.core import EventData, Event
import time
from datetime import datetime
class MarkupTimeout(Timeout):
def __init__(self, *args, **kwargs):
# Timeout expects a number but MarkupMachine passes values to states as strings right now
kwargs['timeout'] = int(kwargs.get('timeout', 0))
super(MarkupTimeout, self).__init__(*args, **kwargs)
# we store trigger times in a dictionary with the model id as keys
self.timeout_at = {}
self.timeout = int(self.timeout)
def resume(self, timeout_at, event_data):
# since we want to give our MarkupMachine some time to instantiate we postulate that
# the earliest possible trigger time is in a second.
trigger_time = time.time() + 1
timeout_at = trigger_time if timeout_at < trigger_time else timeout_at
# we store the default timeout time ...
tmp = self.timeout
# ... and temporary override it with the delta of the intended trigger time and the current time
self.timeout = timeout_at - time.time()
# ... enter the state and trigger the creation of the timer
self.enter(event_data)
# restore the timeout for any future enter event
self.timeout = tmp
def enter(self, event_data):
# a timeout will only be initiated if the timeout value is greater than 0
if self.timeout > 0:
# calculate the time when the timeout will trigger (approximately) ...
timeout_time = time.time() + self.timeout
# and store it in the previously created dictionary
self.timeout_at[id(event_data.model)] = timeout_time
print(f"I should timeout at: {datetime.utcfromtimestamp(timeout_time)}")
super(MarkupTimeout, self).enter(event_data)
def exit(self, event_data):
super(MarkupTimeout, self).exit(event_data)
# remove the timeout time when the state is exited
self.timeout_at[id(event_data.model)] = None
class DBMachine(MarkupMachine):
# DBMachine will use this class when states are created
state_cls = MarkupTimeout
# we customize our model definition and add 'timeout_at' to it
# usually MarkupMachine would iterate over all models but since we assume the model is just
# the machine itself, we can skip that part
def _convert_models(self):
state = self.get_state(self.state)
timeout_at = state.timeout_at.get(id(self), None)
model_def = {'state': state.name,
'name': 'DBMachine',
'class-name': 'self',
'timeout_at': str(timeout_at) if timeout_at is not None else ''}
return [model_def]
def _add_markup_model(self, markup):
initial = markup.get('state', None)
timeout_at = markup.get('timeout_at', '')
self.add_model(self, initial)
if timeout_at:
state = self.get_state(self.state)
# as mentioned above, every configuration value is a string right now
ms = float(timeout_at)
# since we did not store event data, we need to create a temporary event with a minimal EventData object
# that can be passed to state callbacks
state.resume(ms, EventData(state=state,
event=Event(name="resume", machine=self),
machine=self,
model=self,
args=[],
kwargs={}))
# we pass a timeout only for 'pending'
states = ['init', dict(name='pending', timeout=5, on_timeout='cancel'), 'done', 'cancelled']
transitions = [
dict(trigger='request', source='init', dest='pending'),
dict(trigger='cancel', source='pending', dest='cancelled'),
dict(trigger='result', source='pending', dest='done')
]
m = DBMachine(states=states, transitions=transitions, initial='init')
# transition to 'pending' and initiate timer
m.request()
assert m.is_pending()
config = m.markup # [1]
# remove old machine
del m
# create new machine from configuration
m2 = DBMachine(markup=config)
assert m2.is_pending()
time.sleep(10)
assert m2.is_cancelled()
配置 [1] 如下所示:
{ 'after_state_change': [],
'auto_transitions': True,
'before_state_change': [],
'finalize_event': [],
'ignore_invalid_triggers': None,
'initial': 'init',
'models': [ { 'class-name': 'self',
'name': 'DBMachine',
'state': 'pending',
'timeout_at': '1617958918.6320097'}],
'prepare_event': [],
'queued': False,
'send_event': False,
'states': [ {'name': 'init'},
{'name': 'pending', 'on_timeout': ['cancel'], 'timeout': '5'},
{'name': 'done'},
{'name': 'cancelled'}],
'transitions': [ {'dest': 'pending', 'source': 'init', 'trigger': 'request'},
{ 'dest': 'cancelled',
'source': 'pending',
'trigger': 'cancel'},
{'dest': 'done', 'source': 'pending', 'trigger': 'result'}]}
我假设可以重新组织此配置以启用 SQL 查询以过滤即将发生的超时并在必要时实例化机器。 timeout_at
也可以存储为日期时间字符串而不是 unix 时间戳,如果这样可以使查询更容易的话。您也可以只存储 models
部分,而不是从配置创建 DBMachine
,而是以 'common' 方式创建它:
# reuse the states and transitions and only create the model from configuration
# 'model=None' prevents the machine from adding itself as a model too early
m2 = DBMachine(model=None, states=states, transitions=transitions, initial='init')
m2._add_markup_model(config['models'][0])