如何并行迭代 SimPy 模拟?
How can I iterate a SimPy simulation in parallel?
我有一个 SimPy 模型,该模型 returns 是一个随机结果,我想将其复制多次。每个复制都是独立的,所以为了让它更快,我想 运行 它们并行。我试过 Python 的 multiprocessing, Pathos multiprocessing, and joblib Parallel,但每种方法都出现相同的错误:TypeError: can't pickle generator objects
。有什么方法可以避免此错误并运行并行模拟?
SimPy 依赖于生成器,如 here 所述,因此无法避免它们。
错误很好地描述了问题。在您发送给子进程的对象中的某处,生成器潜伏着,大概在函数参数中。是否可以将此生成器转换为列表?
例如,以下会引发您提到的错误:
from multiprocessing import Pool
def firstn(n):
k = 0
while k < n:
yield k
k += 1
if __name__ == "__main__":
p = Pool(2)
print(p.map(firstn, [1, 2, 3, 4]))
但是这个有效:
from multiprocessing import Pool
def firstn(n):
k = 0
while k < n:
yield k
k += 1
def wrapped(n):
return list(firstn(n))
if __name__ == "__main__":
p = Pool(2)
print(p.map(wrapped, [1, 2, 3, 4]))
您需要在新进程中从头开始实例化环境,并注意仅使用原始类型作为要在Pool
中映射的参数。这是一个重新设计的洗车示例(来自 simpy
文档),它使用不同的种子运行 4 次并行模拟,并打印每种情况下洗车的数量。
import multiprocessing as mp
import simpy
import random
NUM_MACHINES = 2 # Number of machines in the carwash
WASHTIME = 5 # Minutes it takes to clean a car
T_INTER = 7 # Create a car every ~7 minutes
SIM_TIME = 20 # Simulation time in minutes
class Carwash(object):
"""A carwash has a limited number of machines (``NUM_MACHINES``) to
clean cars in parallel.
Cars have to request one of the machines. When they got one, they
can start the washing processes and wait for it to finish (which
takes ``washtime`` minutes).
"""
def __init__(self, env, num_machines, washtime):
self.env = env
self.machine = simpy.Resource(env, num_machines)
self.washtime = washtime
def wash(self, car):
"""The washing processes. It takes a ``car`` processes and tries
to clean it."""
yield self.env.timeout(WASHTIME)
def car(env, name, cw):
"""The car process (each car has a ``name``) arrives at the carwash
(``cw``) and requests a cleaning machine.
It then starts the washing process, waits for it to finish and
leaves to never come back ...
"""
with cw.machine.request() as request:
yield request
yield env.process(cw.wash(name))
def setup(env, num_machines, washtime, t_inter):
"""Create a carwash, a number of initial cars and keep creating cars
approx. every ``t_inter`` minutes."""
# Create the carwash
carwash = Carwash(env, num_machines, washtime)
# Create 4 initial cars
for i in range(4):
env.process(car(env, 'Car %d' % i, carwash))
# Create more cars while the simulation is running
while True:
yield env.timeout(random.randint(t_inter - 5, t_inter + 5))
i += 1
env.i = i
env.process(car(env, 'Car %d' % i, carwash))
# additional wrapping function to be executed by the pool
def do_simulation_with_seed(rs):
random.seed(rs) # This influences only the specific process being run
env = simpy.Environment() # THE ENVIRONMENT IS CREATED HERE, IN THE CHILD PROCESS
env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))
env.run(until=SIM_TIME)
return env.i
if __name__ == '__main__':
seeds = range(4)
carwash_pool = mp.Pool(4)
ncars_by_seed = carwash_pool.map(do_simulation_with_seed, seeds)
for s, ncars in zip(seeds, ncars_by_seed):
print('seed={} --> {} cars washed'.format(s, ncars))
我有一个 SimPy 模型,该模型 returns 是一个随机结果,我想将其复制多次。每个复制都是独立的,所以为了让它更快,我想 运行 它们并行。我试过 Python 的 multiprocessing, Pathos multiprocessing, and joblib Parallel,但每种方法都出现相同的错误:TypeError: can't pickle generator objects
。有什么方法可以避免此错误并运行并行模拟?
SimPy 依赖于生成器,如 here 所述,因此无法避免它们。
错误很好地描述了问题。在您发送给子进程的对象中的某处,生成器潜伏着,大概在函数参数中。是否可以将此生成器转换为列表?
例如,以下会引发您提到的错误:
from multiprocessing import Pool
def firstn(n):
k = 0
while k < n:
yield k
k += 1
if __name__ == "__main__":
p = Pool(2)
print(p.map(firstn, [1, 2, 3, 4]))
但是这个有效:
from multiprocessing import Pool
def firstn(n):
k = 0
while k < n:
yield k
k += 1
def wrapped(n):
return list(firstn(n))
if __name__ == "__main__":
p = Pool(2)
print(p.map(wrapped, [1, 2, 3, 4]))
您需要在新进程中从头开始实例化环境,并注意仅使用原始类型作为要在Pool
中映射的参数。这是一个重新设计的洗车示例(来自 simpy
文档),它使用不同的种子运行 4 次并行模拟,并打印每种情况下洗车的数量。
import multiprocessing as mp
import simpy
import random
NUM_MACHINES = 2 # Number of machines in the carwash
WASHTIME = 5 # Minutes it takes to clean a car
T_INTER = 7 # Create a car every ~7 minutes
SIM_TIME = 20 # Simulation time in minutes
class Carwash(object):
"""A carwash has a limited number of machines (``NUM_MACHINES``) to
clean cars in parallel.
Cars have to request one of the machines. When they got one, they
can start the washing processes and wait for it to finish (which
takes ``washtime`` minutes).
"""
def __init__(self, env, num_machines, washtime):
self.env = env
self.machine = simpy.Resource(env, num_machines)
self.washtime = washtime
def wash(self, car):
"""The washing processes. It takes a ``car`` processes and tries
to clean it."""
yield self.env.timeout(WASHTIME)
def car(env, name, cw):
"""The car process (each car has a ``name``) arrives at the carwash
(``cw``) and requests a cleaning machine.
It then starts the washing process, waits for it to finish and
leaves to never come back ...
"""
with cw.machine.request() as request:
yield request
yield env.process(cw.wash(name))
def setup(env, num_machines, washtime, t_inter):
"""Create a carwash, a number of initial cars and keep creating cars
approx. every ``t_inter`` minutes."""
# Create the carwash
carwash = Carwash(env, num_machines, washtime)
# Create 4 initial cars
for i in range(4):
env.process(car(env, 'Car %d' % i, carwash))
# Create more cars while the simulation is running
while True:
yield env.timeout(random.randint(t_inter - 5, t_inter + 5))
i += 1
env.i = i
env.process(car(env, 'Car %d' % i, carwash))
# additional wrapping function to be executed by the pool
def do_simulation_with_seed(rs):
random.seed(rs) # This influences only the specific process being run
env = simpy.Environment() # THE ENVIRONMENT IS CREATED HERE, IN THE CHILD PROCESS
env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))
env.run(until=SIM_TIME)
return env.i
if __name__ == '__main__':
seeds = range(4)
carwash_pool = mp.Pool(4)
ncars_by_seed = carwash_pool.map(do_simulation_with_seed, seeds)
for s, ncars in zip(seeds, ncars_by_seed):
print('seed={} --> {} cars washed'.format(s, ncars))