python 中的动态 class
Dynamic class in python
这可能是标题错误,但这是我的问题。
我有一个系统,包括一个 微控制器 (MCU)、一个 串行接口 (SPI)、一个 DAC(数字/模拟转换器),一个电极(E)。在我的 python 模型化中,每个元素都被定义为 class。
作为第一步,我想在微控制器中输入一些内容时监视电极上的输出。
让我们考虑以下问题:
- 输入:1 毫秒内电极上的电流为 2 mA。
- MCU 通过 SPI 发送新的 DAC 值:30 us
- DAC 更新其寄存器和输出:400 us
- MCU向电极发送开机命令:1 us
- 电极输出中
- 1ms后,向电极发送关机命令:1us
- 电极不输出了
我的 2 个最大问题是 1.如何考虑这个时间分量和 2.如何监控 SPI 线以确定是否必须做某事。
class Electrode:
def __init__(self, id):
self.id = id
self.switch = False
self.value = 0
def output(self):
if self.switch:
return self.value
else:
return 0
class SPI:
def __init__(self):
self.msg = None
class MCU:
def __init__(self):
self.name = "MicroController"
def send_SPI_msg(self, SPI, msg):
SPI.msg = msg
class DAC:
def __init__(id):
self.id = id
self.cs = 1
self.register = None
self.output = None
def read_SPI_msg(self, SPI):
message = SPI.msg
# update register and output
我的系统实际上有 16 个 DAC 和电极以及一个 field-programmable 门阵列,它们都在听同一个 SPI。我上面描述的是一个相当简化的版本。
问题是:如何让组件定期检查SPI.msg
中的值并据此采取行动?
实际上,每个组件都在发挥自己的作用。因此,动作是并行执行的。由于我正在尝试模拟时间线和执行的操作,因此我不介意使用每个元素的时间线变量(属性)连续执行所有操作。我只是想弄清楚如何让我的 classes 一起互动。
即我不能在 python 中执行以下操作,否则我会卡住:
class DAC:
def __init__(id):
# init
def read_SPI_msg(self, SPI):
while True:
message = SPI.msg
# update register and output if needed
也许可以使用事件触发...但我不知道如何。
也许使用多线程,定义一个线程/元素?
编辑:当前状态:
class SPI:
def __init__(self):
self.attached_dacs = []
self.attached_fpga = []
self.attached_mcu = []
def attach_device(self, device):
if type(device) == DAC:
self.attached_dacs.append(device)
elif type(device) == FPGA:
self.attached_fpga.append(device)
elif type(device) == MCU:
self.attached_mcu.append(device)
def send_message(self, msg):
for device in self.attached_dacs + self.attached_fpga:
device.on_spi_message(self, msg)
class SpiAttachableDevice:
def on_spi_message(self, SPI, message):
if self.cs:
self.execute_SPI_message(message)
else:
return None
class DAC(SpiAttachableDevice):
def __init__(self, id):
self.id = id
self.cs = False # Not listening
def execute_SPI_message(message):
# Do stuff
class FPGA(SpiAttachableDevice):
def __init__(self):
self.electrodes = list()
self.cs = False # Not listening
def execute_SPI_message(message):
# Do stuff
class MCU:
def __init__(self):
self.electrodes = list()
我假设您想保持单线程并且不使用 asyncio。在这种情况下,您可能希望在实现 SPI 时使用 observer 或 pub/sub 模式:
class SPI:
def __init__(self):
self.attached_devices = []
def attach_device(self, device):
self.attached_devices.append(device)
def send_message(self, msg):
for device in self.attached_devices:
device.on_spi_message(self, msg)
class SpiAttachableDevice:
def on_spi_message(self, spi_instance, message):
raise NotImplementedError('subclass me!')
所以你可以这样使用它:
spi = SPI()
device_1 = Device()
device_2 = Device()
spi.attach_device(device_1)
spi.attach_device(device_2)
spi.send_message('hello')
我没有做任何事情来从 Device
对象发送 SPI 消息,但您可以相应地更新抽象。
您可以将 while
循环移到外部:
class SPI:
def __init__(self, msg):
self.msg = msg
class Component:
def __init__(self, spi):
self.spi = spi
def tick(self, t):
msg = self.spi.msg
if msg = "...":
...
spi = SPI()
components = [Component(spi), ...]
for t in range(TOTAL_TIME):
for component in components:
component.tick(t)
正如您在评论中所述,您希望对正在发生的事情有更多的时间轴视图。您可以有一个明确的时间线,您的组件可以与之交互。可以用相同的方式预先设置外部输入(状态变化)。要订购时间线,我每次都会 运行 排序,但使用 priority queue.
之类的东西可能会更高效
这与 Vovanrock2002 答案的主要区别在于不在每个时间步中递归并且具有明确的时间线。
class Component:
def __init__(self, timeline):
self._timeline = timeline
self._out = [] #all connected components
def poke(self, changed_object, time):
return []
class Clock(Component):
def __init__(self, timeline):
Component.__init__(self, timeline)
self._out.append(self)
self.msg = "tick"
self._timeline.append((200, self, msg))
def poke(self, time, changed_object, msg):
self._timeline.append((time + 200, self, self.msg))
timeline = []
spi = SPI(timeline)
components = [spi, Clock(timeline), ComponentA(timeline), ...]
timeline.append((500, spi, "new DAC value"))
while timeline:
timeline.sort(key=lambda event: event[0], reverse=True)
event = timeline.pop()
time, changed_component, msg:
for connected_component in changed_component._out:
connected_component.poke(time, changed_component, msg)
这样你就有了一个明确的时间线(你也可以 "record",只需将每个弹出的事件添加到某个列表中)并且你可以有任意连接的组件(例如,如果你想有多个 SPI)。
这可能是标题错误,但这是我的问题。
我有一个系统,包括一个 微控制器 (MCU)、一个 串行接口 (SPI)、一个 DAC(数字/模拟转换器),一个电极(E)。在我的 python 模型化中,每个元素都被定义为 class。
作为第一步,我想在微控制器中输入一些内容时监视电极上的输出。
让我们考虑以下问题:
- 输入:1 毫秒内电极上的电流为 2 mA。
- MCU 通过 SPI 发送新的 DAC 值:30 us
- DAC 更新其寄存器和输出:400 us
- MCU向电极发送开机命令:1 us
- 电极输出中
- 1ms后,向电极发送关机命令:1us
- 电极不输出了
我的 2 个最大问题是 1.如何考虑这个时间分量和 2.如何监控 SPI 线以确定是否必须做某事。
class Electrode:
def __init__(self, id):
self.id = id
self.switch = False
self.value = 0
def output(self):
if self.switch:
return self.value
else:
return 0
class SPI:
def __init__(self):
self.msg = None
class MCU:
def __init__(self):
self.name = "MicroController"
def send_SPI_msg(self, SPI, msg):
SPI.msg = msg
class DAC:
def __init__(id):
self.id = id
self.cs = 1
self.register = None
self.output = None
def read_SPI_msg(self, SPI):
message = SPI.msg
# update register and output
我的系统实际上有 16 个 DAC 和电极以及一个 field-programmable 门阵列,它们都在听同一个 SPI。我上面描述的是一个相当简化的版本。
问题是:如何让组件定期检查SPI.msg
中的值并据此采取行动?
实际上,每个组件都在发挥自己的作用。因此,动作是并行执行的。由于我正在尝试模拟时间线和执行的操作,因此我不介意使用每个元素的时间线变量(属性)连续执行所有操作。我只是想弄清楚如何让我的 classes 一起互动。
即我不能在 python 中执行以下操作,否则我会卡住:
class DAC:
def __init__(id):
# init
def read_SPI_msg(self, SPI):
while True:
message = SPI.msg
# update register and output if needed
也许可以使用事件触发...但我不知道如何。
也许使用多线程,定义一个线程/元素?
编辑:当前状态:
class SPI:
def __init__(self):
self.attached_dacs = []
self.attached_fpga = []
self.attached_mcu = []
def attach_device(self, device):
if type(device) == DAC:
self.attached_dacs.append(device)
elif type(device) == FPGA:
self.attached_fpga.append(device)
elif type(device) == MCU:
self.attached_mcu.append(device)
def send_message(self, msg):
for device in self.attached_dacs + self.attached_fpga:
device.on_spi_message(self, msg)
class SpiAttachableDevice:
def on_spi_message(self, SPI, message):
if self.cs:
self.execute_SPI_message(message)
else:
return None
class DAC(SpiAttachableDevice):
def __init__(self, id):
self.id = id
self.cs = False # Not listening
def execute_SPI_message(message):
# Do stuff
class FPGA(SpiAttachableDevice):
def __init__(self):
self.electrodes = list()
self.cs = False # Not listening
def execute_SPI_message(message):
# Do stuff
class MCU:
def __init__(self):
self.electrodes = list()
我假设您想保持单线程并且不使用 asyncio。在这种情况下,您可能希望在实现 SPI 时使用 observer 或 pub/sub 模式:
class SPI:
def __init__(self):
self.attached_devices = []
def attach_device(self, device):
self.attached_devices.append(device)
def send_message(self, msg):
for device in self.attached_devices:
device.on_spi_message(self, msg)
class SpiAttachableDevice:
def on_spi_message(self, spi_instance, message):
raise NotImplementedError('subclass me!')
所以你可以这样使用它:
spi = SPI()
device_1 = Device()
device_2 = Device()
spi.attach_device(device_1)
spi.attach_device(device_2)
spi.send_message('hello')
我没有做任何事情来从 Device
对象发送 SPI 消息,但您可以相应地更新抽象。
您可以将 while
循环移到外部:
class SPI:
def __init__(self, msg):
self.msg = msg
class Component:
def __init__(self, spi):
self.spi = spi
def tick(self, t):
msg = self.spi.msg
if msg = "...":
...
spi = SPI()
components = [Component(spi), ...]
for t in range(TOTAL_TIME):
for component in components:
component.tick(t)
正如您在评论中所述,您希望对正在发生的事情有更多的时间轴视图。您可以有一个明确的时间线,您的组件可以与之交互。可以用相同的方式预先设置外部输入(状态变化)。要订购时间线,我每次都会 运行 排序,但使用 priority queue.
之类的东西可能会更高效这与 Vovanrock2002 答案的主要区别在于不在每个时间步中递归并且具有明确的时间线。
class Component:
def __init__(self, timeline):
self._timeline = timeline
self._out = [] #all connected components
def poke(self, changed_object, time):
return []
class Clock(Component):
def __init__(self, timeline):
Component.__init__(self, timeline)
self._out.append(self)
self.msg = "tick"
self._timeline.append((200, self, msg))
def poke(self, time, changed_object, msg):
self._timeline.append((time + 200, self, self.msg))
timeline = []
spi = SPI(timeline)
components = [spi, Clock(timeline), ComponentA(timeline), ...]
timeline.append((500, spi, "new DAC value"))
while timeline:
timeline.sort(key=lambda event: event[0], reverse=True)
event = timeline.pop()
time, changed_component, msg:
for connected_component in changed_component._out:
connected_component.poke(time, changed_component, msg)
这样你就有了一个明确的时间线(你也可以 "record",只需将每个弹出的事件添加到某个列表中)并且你可以有任意连接的组件(例如,如果你想有多个 SPI)。