设计一个系统来集中管理不同系统上的一系列事件
Designing a system to centrally manage series of events on different systems
我在工作中遇到了一个问题,我需要在不同的设备上执行一系列顺序任务。这些设备不需要相互交互,并且每个顺序任务都可以在每个设备上独立执行。
假设我有任务 (A->B->C->D)(例如:A 结束触发 B,B 结束触发 C 等等),设备(dev1、dev2)可以执行这些任务相互独立。
如何设计一个集中式系统,在每个设备上执行每个任务。由于 Infra 限制,我无法使用 Threading 或 Multiprocessing。
我正在寻找一些设计建议 (类) 以及我如何着手设计它。
我想到的第一种方法是蛮力,我盲目地使用循环遍历设备并执行每项任务。
第二种方法我正在阅读有关状态设计模式的信息,但我不确定如何实现它。
编辑:我已经实施了我在下面提供的答案。但是我想知道在状态之间传输信息的正确方法。我知道状态需要相互排斥,但每个任务都需要访问所有资源中共有的某些资源。我该如何构建它?
我可能会尝试使用 Flask 来实现超级简单 api 和设备上的客户端应用程序,这些应用程序可以“汇集”来自中心 api 和 post 结果的数据,以便中心服务器了解进度以及目前使用的是什么。客户端应用程序将是超级简单的睡眠循环,因此它不会 100% cpu 不需要。
我已经使用状态设计模式来处理这个问题。我有一个设备 class,它是具体的 class 并且有一个名为“perform_task”的方法。此方法根据它所处的状态更改行为。在给定点,它可以在 TaskA TaskB 等中。
class Device():
_state = None
def __init__(self):
"""Constructor method"""
self.switch_to(TaskA())
def switch_to(self, state):
self._state = state
self._state.context = self
def perform_task(self):
self._state.perform_task()
然后我有一个 State Abstract class,它有抽象方法。其次是状态 classes 本身。
class State(ABC):
@property
def context(self):
return self._context
@context.setter
def context(self, context):
self._context = context
@abstractmethod
def perform_task(self):
pass
class TaskA():
def perform_task(self):
# Do something
self.context.switch_to(TaskB())
class TaskB():
def perform_task():
# Do something.
pass
这样做我们可以在未来将其扩展到任意数量的州并处理新的情况。
我在工作中遇到了一个问题,我需要在不同的设备上执行一系列顺序任务。这些设备不需要相互交互,并且每个顺序任务都可以在每个设备上独立执行。
假设我有任务 (A->B->C->D)(例如:A 结束触发 B,B 结束触发 C 等等),设备(dev1、dev2)可以执行这些任务相互独立。
如何设计一个集中式系统,在每个设备上执行每个任务。由于 Infra 限制,我无法使用 Threading 或 Multiprocessing。 我正在寻找一些设计建议 (类) 以及我如何着手设计它。 我想到的第一种方法是蛮力,我盲目地使用循环遍历设备并执行每项任务。 第二种方法我正在阅读有关状态设计模式的信息,但我不确定如何实现它。
编辑:我已经实施了我在下面提供的答案。但是我想知道在状态之间传输信息的正确方法。我知道状态需要相互排斥,但每个任务都需要访问所有资源中共有的某些资源。我该如何构建它?
我可能会尝试使用 Flask 来实现超级简单 api 和设备上的客户端应用程序,这些应用程序可以“汇集”来自中心 api 和 post 结果的数据,以便中心服务器了解进度以及目前使用的是什么。客户端应用程序将是超级简单的睡眠循环,因此它不会 100% cpu 不需要。
我已经使用状态设计模式来处理这个问题。我有一个设备 class,它是具体的 class 并且有一个名为“perform_task”的方法。此方法根据它所处的状态更改行为。在给定点,它可以在 TaskA TaskB 等中。
class Device():
_state = None
def __init__(self):
"""Constructor method"""
self.switch_to(TaskA())
def switch_to(self, state):
self._state = state
self._state.context = self
def perform_task(self):
self._state.perform_task()
然后我有一个 State Abstract class,它有抽象方法。其次是状态 classes 本身。
class State(ABC):
@property
def context(self):
return self._context
@context.setter
def context(self, context):
self._context = context
@abstractmethod
def perform_task(self):
pass
class TaskA():
def perform_task(self):
# Do something
self.context.switch_to(TaskB())
class TaskB():
def perform_task():
# Do something.
pass
这样做我们可以在未来将其扩展到任意数量的州并处理新的情况。