我将如何使用 simpy 编写带有缓冲区的 2 台机器的模拟?
How would I code a simulation of 2 machines with a buffer using simpy?
我是模拟新手,正在浏览简单的文档。我有点明白了它的要点,但似乎无法真正掌握如何翻译我想要模拟的生产线。
我正在尝试模拟一条具有 m 台机器和 m-1 个缓冲区的生产线。
机器基本一样:
- 每个状态以 s 的速度处理单元
- 状态和时间的随机分布
- 缓冲区中有 starvation/blockage 吗?
缓冲区的工作原理相同:
- 从以前的机器接收单位
- 失去下一台机器的单位
- capacity(t) = capacity(t-1) + (输入上一台机器) - (输出下一台
机)
- 容量(t) <= 最大容量
现在我知道模拟的第一步是小步骤,所以我需要先制作一个简单的模型。我希望创建然后使用 simpy 的是一个具有固定处理速度、故障率和维护速度的 2 机器和 1 缓冲系统:
- 向机器 1 传送了无限容量的单元。
- 机器1以速度s处理,f分钟后出现故障,r分钟后修复。如果缓冲区已满,则机器 1 停止。
- 缓冲区由机器 1 填充并由机器 2 清空。存在最大容量。
- 机器 2 以 min(speed s, capacity buffer) 处理 f 分钟后失败,并在 r 分钟后修复。如果缓冲区为空,则机器 2 停止。
- Machine 2 后无限缓冲容量
编辑:我从@Michael 那里得到的答案非常有效,我试着解决故障和维护问题。机器似乎出现故障并被修复,但在故障时间的倍数(我需要修复)中不断出现故障。我使用的代码如下:
# Machine 1
speed_1 = 2 # Avg. processing time of Machine 1 in minutes
# speed_1_stdev = 0.6 # St. dev. of processing time of Machine 1
MTTF_1 = 10 # Mean time to failure Machine 1
# fail_1 = 1/MTTF_1 # Parameter for exp. distribution
repair_1 = 3 # Time it takes to repair Machine 1
# Machine 2
speed_2 = 3 # Processing time of Machine 2 in minutes
# speed_2_stdev = 0.6 # St. dev. of processing time of Machine 2
MTTF_2 = 7 # Mean time to failure Machine 1
# fail_2 = 1/MTTF_2 # Parameter for exp. distribution
repair_2 = 4 # Time it takes to repair Machine 2
# Simulation time
time = 120 # Sim time in minutes
#---------------------------------------------------------------------
# Class setup for a Machine
class Machine(object):
"""
A machine produces units at a fixed processing speed,
takes units from a store before and puts units into a store after.
Machine has a *name*, a processing speed *speed*, a preceeding buffer *in_q*,
and a proceeding buffer *out_q*.
Next steps:
- Machine produces units at distributed processing speeds.
- A machine fails at fixed intervals and is repaired at a fixed time.
- Failure and repair times are distributed.
"""
def __init__(self, env, name, in_q, out_q, speed, mttf, repair):
self.env = env
self.name = name
self.in_q = in_q
self.out_q = out_q
self.speed = speed
self.mttf = mttf
self.repair = repair
self.broken = False
# Start the producing process
self.process = env.process(self.produce())
# Start the failure process
env.process(self.fail_machine())
def produce(self):
"""
Produce parts as long as the simulation runs.
"""
while True:
part = yield self.in_q.get()
try:
# If want to see time {self.env.now:.2f}
print(f'{self.name} has got a part')
yield env.timeout(self.speed)
if len(self.out_q.items) < self.out_q.capacity:
print(f'{self.name} finish a part next buffer has {len(self.out_q.items)} and capacity of {self.out_q.capacity}')
else:
print(f'{self.env.now:.2f} {self.name} output buffer full!!!')
yield self.out_q.put(part)
print(f'{self.name} pushed part to next buffer')
except simpy.Interrupt:
self.broken = True
yield self.env.timeout(self.repair)
print(f'{self.env.now:.2f} {self.name} is in fixed')
self.broken = False
def fail_machine(self):
"""
The machine is prone to break down every now and then.
"""
while True:
yield self.env.timeout(self.mttf)
print(f'{self.env.now:.2f} {self.name} is in failure.')
if not self.broken:
# Machine only fails if currently working.
self.process.interrupt(self.mttf)
#---------------------------------------------------------------------
# Generating the arrival of parts in the entry buffer to be used by machine 1
def gen_arrivals(env, entry_buffer):
"""
Start the process for each part by putting
the part in the starting buffer
"""
while True:
yield env.timeout(random.uniform(0,0.001))
# print(f'{env.now:.2f} part has arrived')
part = object() # Too lazy to make a real part class, also isn't necessary
yield entry_buffer.put(part)
#---------------------------------------------------------------------
# Create environment and start the setup process
env = simpy.Environment()
bufferStart = simpy.Store(env) # Buffer with unlimited capacity
buffer1 = simpy.Store(env, capacity = 8) # Buffer between machines with limited capacity
bufferEnd = simpy.Store(env) # Last buffer with unlimited capacity
# The machines __init__ starts the machine process so no env.process() is needed here
machine_1 = Machine(env, 'Machine 1', bufferStart, buffer1, speed_1, MTTF_1, repair_1)
machine_2 = Machine(env, 'Machine 2', buffer1, bufferEnd, speed_2, MTTF_2, repair_2)
env.process(gen_arrivals(env, bufferStart))
# Execute
env.run(until = time)
我使用 simmpy.Store 作为缓冲区。你可以为商店设置最大容量,我认为默认是无限的。当您执行 yield my_store.put() 时,如果商店已满,它将阻塞。此外,当您执行 yield my_store.get() 时,如果存储为空,它将阻塞。
您需要制作一台机器class。对于第一个版本,我将跳过细分。机器应该有一个无限循环的进程,从它的输入缓冲区执行 get(),延迟一些时间(yield env.timeout()),然后执行 put() 将零件放入下一台机器的输入缓冲。第一个版本应该是这样的:一个生成到达部分的过程,这些部分被放入机器 1 输入缓冲区,机器 1 从其输入缓冲区中拉出 (get()) 并将 (put()) 推送到机器 2 输入缓冲区,一个机器 2 从其输入缓冲区中拉取 (get()) 并将 (put()) 推送到退出缓冲区缓冲区。所以这就是 1 生成到达,2 台机器(可以是相同的 class)和 3 simpy.Store s
试一试。如果您仍然需要示例代码,我会在本周末尝试。今天和明天有真正的工作要做。
祝你好运
厌倦了我的工作,所以我做了一些小改动来修复你的代码。我认为它甚至有效。最后一点,不止一台机器可以使用相同的缓冲区。因此一组机器可以处理同一个缓冲区。这就是我所做的,而不是将 simpy.resurce 用于资源池
import simpy
import random
speed_1 = 3 # Avg. processing time of Machine 1 in minutes
speed_2 = 4 # Processing time of Machine 2 in minutes
time = 120 # Sim time in minutes
# Class setup for Machine 1
# This needs to be done as there are varying processing speeds
class Machine(object):
"""
A machine produces units at a fixed processing speed,
takes units from a store before and puts units into a store after.
Machine has a *name*
Next steps:
- Machine produces units at distributed processing speeds.
- A machine fails at fixed intervals and is repaired at a fixed time.
- Failure and repair times are distributed.
"""
def __init__(self, env, name, in_q, out_q, speed):
self.env = env
self.name = name
self.in_q = in_q
self.out_q = out_q
self.speed = speed
# Start the producing process
self.process = env.process(self.produce())
def produce(self):
"""
Produce parts as long as the simulation runs.
"""
while True:
part = yield self.in_q.get()
print(f'{self.env.now:.2f} Machine {self.name} has got a part')
yield env.timeout(self.speed)
print(f'{self.env.now:.2f} Machine {self.name} finish a part next q has {len(self.out_q.items)} and capacit of {self.out_q.capacity}')
yield self.out_q.put(part)
print(f'{self.env.now:.2f} Machine {self.name} pushed part to next buffer')
def gen_arrivals(env, entry_buffer):
"""
start the process for each part by putting
the part in the starting buffer
"""
while True:
yield env.timeout(random.uniform(1,4))
print(f'{env.now:.2f} part has arrived')
part = object() # too lazy to make a real part class
yield entry_buffer.put(part)
# Create environment and start the setup process
env = simpy.Environment()
bufferStart = simpy.Store(env) # Buffer with unlimited capacity
buffer1 = simpy.Store(env, capacity = 6) # Buffer between machines with limited capacity
bufferEnd = simpy.Store(env) # Last buffer with unlimited capacity
# the machines __init__ starts the machine process so no env.process() is needed here
machine_1 = Machine(env, 'Machine 1', bufferStart, buffer1, speed_1)
machine_2 = Machine(env, 'Machine 2', buffer1, bufferEnd, speed_2)
#machine_3 = Machine(env, 'Machine 3', buffer1, bufferEnd, speed_2)
env.process(gen_arrivals(env, bufferStart))
# Execute
env.run(until = time)
我是模拟新手,正在浏览简单的文档。我有点明白了它的要点,但似乎无法真正掌握如何翻译我想要模拟的生产线。
我正在尝试模拟一条具有 m 台机器和 m-1 个缓冲区的生产线。
机器基本一样:
- 每个状态以 s 的速度处理单元
- 状态和时间的随机分布
- 缓冲区中有 starvation/blockage 吗?
缓冲区的工作原理相同:
- 从以前的机器接收单位
- 失去下一台机器的单位
- capacity(t) = capacity(t-1) + (输入上一台机器) - (输出下一台 机)
- 容量(t) <= 最大容量
现在我知道模拟的第一步是小步骤,所以我需要先制作一个简单的模型。我希望创建然后使用 simpy 的是一个具有固定处理速度、故障率和维护速度的 2 机器和 1 缓冲系统:
- 向机器 1 传送了无限容量的单元。
- 机器1以速度s处理,f分钟后出现故障,r分钟后修复。如果缓冲区已满,则机器 1 停止。
- 缓冲区由机器 1 填充并由机器 2 清空。存在最大容量。
- 机器 2 以 min(speed s, capacity buffer) 处理 f 分钟后失败,并在 r 分钟后修复。如果缓冲区为空,则机器 2 停止。
- Machine 2 后无限缓冲容量
编辑:我从@Michael 那里得到的答案非常有效,我试着解决故障和维护问题。机器似乎出现故障并被修复,但在故障时间的倍数(我需要修复)中不断出现故障。我使用的代码如下:
# Machine 1
speed_1 = 2 # Avg. processing time of Machine 1 in minutes
# speed_1_stdev = 0.6 # St. dev. of processing time of Machine 1
MTTF_1 = 10 # Mean time to failure Machine 1
# fail_1 = 1/MTTF_1 # Parameter for exp. distribution
repair_1 = 3 # Time it takes to repair Machine 1
# Machine 2
speed_2 = 3 # Processing time of Machine 2 in minutes
# speed_2_stdev = 0.6 # St. dev. of processing time of Machine 2
MTTF_2 = 7 # Mean time to failure Machine 1
# fail_2 = 1/MTTF_2 # Parameter for exp. distribution
repair_2 = 4 # Time it takes to repair Machine 2
# Simulation time
time = 120 # Sim time in minutes
#---------------------------------------------------------------------
# Class setup for a Machine
class Machine(object):
"""
A machine produces units at a fixed processing speed,
takes units from a store before and puts units into a store after.
Machine has a *name*, a processing speed *speed*, a preceeding buffer *in_q*,
and a proceeding buffer *out_q*.
Next steps:
- Machine produces units at distributed processing speeds.
- A machine fails at fixed intervals and is repaired at a fixed time.
- Failure and repair times are distributed.
"""
def __init__(self, env, name, in_q, out_q, speed, mttf, repair):
self.env = env
self.name = name
self.in_q = in_q
self.out_q = out_q
self.speed = speed
self.mttf = mttf
self.repair = repair
self.broken = False
# Start the producing process
self.process = env.process(self.produce())
# Start the failure process
env.process(self.fail_machine())
def produce(self):
"""
Produce parts as long as the simulation runs.
"""
while True:
part = yield self.in_q.get()
try:
# If want to see time {self.env.now:.2f}
print(f'{self.name} has got a part')
yield env.timeout(self.speed)
if len(self.out_q.items) < self.out_q.capacity:
print(f'{self.name} finish a part next buffer has {len(self.out_q.items)} and capacity of {self.out_q.capacity}')
else:
print(f'{self.env.now:.2f} {self.name} output buffer full!!!')
yield self.out_q.put(part)
print(f'{self.name} pushed part to next buffer')
except simpy.Interrupt:
self.broken = True
yield self.env.timeout(self.repair)
print(f'{self.env.now:.2f} {self.name} is in fixed')
self.broken = False
def fail_machine(self):
"""
The machine is prone to break down every now and then.
"""
while True:
yield self.env.timeout(self.mttf)
print(f'{self.env.now:.2f} {self.name} is in failure.')
if not self.broken:
# Machine only fails if currently working.
self.process.interrupt(self.mttf)
#---------------------------------------------------------------------
# Generating the arrival of parts in the entry buffer to be used by machine 1
def gen_arrivals(env, entry_buffer):
"""
Start the process for each part by putting
the part in the starting buffer
"""
while True:
yield env.timeout(random.uniform(0,0.001))
# print(f'{env.now:.2f} part has arrived')
part = object() # Too lazy to make a real part class, also isn't necessary
yield entry_buffer.put(part)
#---------------------------------------------------------------------
# Create environment and start the setup process
env = simpy.Environment()
bufferStart = simpy.Store(env) # Buffer with unlimited capacity
buffer1 = simpy.Store(env, capacity = 8) # Buffer between machines with limited capacity
bufferEnd = simpy.Store(env) # Last buffer with unlimited capacity
# The machines __init__ starts the machine process so no env.process() is needed here
machine_1 = Machine(env, 'Machine 1', bufferStart, buffer1, speed_1, MTTF_1, repair_1)
machine_2 = Machine(env, 'Machine 2', buffer1, bufferEnd, speed_2, MTTF_2, repair_2)
env.process(gen_arrivals(env, bufferStart))
# Execute
env.run(until = time)
我使用 simmpy.Store 作为缓冲区。你可以为商店设置最大容量,我认为默认是无限的。当您执行 yield my_store.put() 时,如果商店已满,它将阻塞。此外,当您执行 yield my_store.get() 时,如果存储为空,它将阻塞。
您需要制作一台机器class。对于第一个版本,我将跳过细分。机器应该有一个无限循环的进程,从它的输入缓冲区执行 get(),延迟一些时间(yield env.timeout()),然后执行 put() 将零件放入下一台机器的输入缓冲。第一个版本应该是这样的:一个生成到达部分的过程,这些部分被放入机器 1 输入缓冲区,机器 1 从其输入缓冲区中拉出 (get()) 并将 (put()) 推送到机器 2 输入缓冲区,一个机器 2 从其输入缓冲区中拉取 (get()) 并将 (put()) 推送到退出缓冲区缓冲区。所以这就是 1 生成到达,2 台机器(可以是相同的 class)和 3 simpy.Store s
试一试。如果您仍然需要示例代码,我会在本周末尝试。今天和明天有真正的工作要做。
祝你好运
厌倦了我的工作,所以我做了一些小改动来修复你的代码。我认为它甚至有效。最后一点,不止一台机器可以使用相同的缓冲区。因此一组机器可以处理同一个缓冲区。这就是我所做的,而不是将 simpy.resurce 用于资源池
import simpy
import random
speed_1 = 3 # Avg. processing time of Machine 1 in minutes
speed_2 = 4 # Processing time of Machine 2 in minutes
time = 120 # Sim time in minutes
# Class setup for Machine 1
# This needs to be done as there are varying processing speeds
class Machine(object):
"""
A machine produces units at a fixed processing speed,
takes units from a store before and puts units into a store after.
Machine has a *name*
Next steps:
- Machine produces units at distributed processing speeds.
- A machine fails at fixed intervals and is repaired at a fixed time.
- Failure and repair times are distributed.
"""
def __init__(self, env, name, in_q, out_q, speed):
self.env = env
self.name = name
self.in_q = in_q
self.out_q = out_q
self.speed = speed
# Start the producing process
self.process = env.process(self.produce())
def produce(self):
"""
Produce parts as long as the simulation runs.
"""
while True:
part = yield self.in_q.get()
print(f'{self.env.now:.2f} Machine {self.name} has got a part')
yield env.timeout(self.speed)
print(f'{self.env.now:.2f} Machine {self.name} finish a part next q has {len(self.out_q.items)} and capacit of {self.out_q.capacity}')
yield self.out_q.put(part)
print(f'{self.env.now:.2f} Machine {self.name} pushed part to next buffer')
def gen_arrivals(env, entry_buffer):
"""
start the process for each part by putting
the part in the starting buffer
"""
while True:
yield env.timeout(random.uniform(1,4))
print(f'{env.now:.2f} part has arrived')
part = object() # too lazy to make a real part class
yield entry_buffer.put(part)
# Create environment and start the setup process
env = simpy.Environment()
bufferStart = simpy.Store(env) # Buffer with unlimited capacity
buffer1 = simpy.Store(env, capacity = 6) # Buffer between machines with limited capacity
bufferEnd = simpy.Store(env) # Last buffer with unlimited capacity
# the machines __init__ starts the machine process so no env.process() is needed here
machine_1 = Machine(env, 'Machine 1', bufferStart, buffer1, speed_1)
machine_2 = Machine(env, 'Machine 2', buffer1, bufferEnd, speed_2)
#machine_3 = Machine(env, 'Machine 3', buffer1, bufferEnd, speed_2)
env.process(gen_arrivals(env, bufferStart))
# Execute
env.run(until = time)