如何并行迭代 SimPy 模拟?

How can I iterate a SimPy simulation in parallel?

我有一个 SimPy 模型,该模型 returns 是一个随机结果,我想将其复制多次。每个复制都是独立的,所以为了让它更快,我想 运行 它们并行。我试过 Python 的 multiprocessing, Pathos multiprocessing, and joblib Parallel,但每种方法都出现相同的错误:TypeError: can't pickle generator objects。有什么方法可以避免此错误并运行并行模拟?

SimPy 依赖于生成器,如 here 所述,因此无法避免它们。

错误很好地描述了问题。在您发送给子进程的对象中的某处,生成器潜伏着,大概在函数参数中。是否可以将此生成器转换为列表?

例如,以下会引发您提到的错误:

from multiprocessing import Pool

def firstn(n):
    k = 0
    while k < n:
        yield k
        k += 1

if __name__ == "__main__":
    p = Pool(2)
    print(p.map(firstn, [1, 2, 3, 4]))

但是这个有效:

from multiprocessing import Pool

def firstn(n):
    k = 0
    while k < n:
        yield k
        k += 1

def wrapped(n):
    return list(firstn(n))

if __name__ == "__main__":
    p = Pool(2)
    print(p.map(wrapped, [1, 2, 3, 4]))

您需要在新进程中从头开始实例化环境,并注意仅使用原始类型作为要在Pool 中映射的参数。这是一个重新设计的洗车示例(来自 simpy 文档),它使用不同的种子运行 4 次并行模拟,并打印每种情况下洗车的数量。

import multiprocessing as mp
import simpy
import random


NUM_MACHINES = 2  # Number of machines in the carwash
WASHTIME = 5      # Minutes it takes to clean a car
T_INTER = 7       # Create a car every ~7 minutes
SIM_TIME = 20     # Simulation time in minutes


class Carwash(object):
    """A carwash has a limited number of machines (``NUM_MACHINES``) to
    clean cars in parallel.

    Cars have to request one of the machines. When they got one, they
    can start the washing processes and wait for it to finish (which
    takes ``washtime`` minutes).

    """
    def __init__(self, env, num_machines, washtime):
        self.env = env
        self.machine = simpy.Resource(env, num_machines)
        self.washtime = washtime

    def wash(self, car):
        """The washing processes. It takes a ``car`` processes and tries
        to clean it."""
        yield self.env.timeout(WASHTIME)


def car(env, name, cw):
    """The car process (each car has a ``name``) arrives at the carwash
    (``cw``) and requests a cleaning machine.

    It then starts the washing process, waits for it to finish and
    leaves to never come back ...

    """
    with cw.machine.request() as request:
        yield request
        yield env.process(cw.wash(name))


def setup(env, num_machines, washtime, t_inter):
    """Create a carwash, a number of initial cars and keep creating cars
    approx. every ``t_inter`` minutes."""
    # Create the carwash
    carwash = Carwash(env, num_machines, washtime)

    # Create 4 initial cars
    for i in range(4):
        env.process(car(env, 'Car %d' % i, carwash))

    # Create more cars while the simulation is running
    while True:
        yield env.timeout(random.randint(t_inter - 5, t_inter + 5))
        i += 1
        env.i = i
        env.process(car(env, 'Car %d' % i, carwash))


# additional wrapping function to be executed by the pool
def do_simulation_with_seed(rs):

    random.seed(rs)  # This influences only the specific process being run
    env = simpy.Environment()  # THE ENVIRONMENT IS CREATED HERE, IN THE CHILD PROCESS
    env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))

    env.run(until=SIM_TIME)

    return env.i


if __name__ == '__main__':
    seeds = range(4)
    carwash_pool = mp.Pool(4)
    ncars_by_seed = carwash_pool.map(do_simulation_with_seed, seeds)
    for s, ncars in zip(seeds, ncars_by_seed):
        print('seed={} --> {} cars washed'.format(s, ncars))