monte carlo python 中的模拟:使用 ipyparallel 的并行化比序列化花费更长的时间

monte carlo simulation in python: parallelization using ipyparallel taking longer time than serialization

嘿,

我正在 monte carlo 模拟散射介质中的光子传输。我正在尝试将其并行化,但很难观察到与串行模拟相比 运行ning 时间的任何性能改进

可以在下面找到蒙特卡洛代码。 class Photon 包含用于计算单个光子的传输和散射的各种方法,而 class RunPhotonPackage 运行 对于给定厚度 L 的散射介质,一系列 N 个光子。这些是目前我唯一的输入参数:

import matplotlib.pyplot as plt
import numpy as np
from numpy.random import random as rand


NPHOTONS = 100000 # Nb photons
PI  = np.pi
EPS = 1.e-6
L = 100. # scattering layer thickness

class Photon():

    mut = 0.02 
    k = [0,0,1]

    def __init__(self,ko,pos):
        Photon.k = ko
        self.x = pos[0]
        self.y = pos[1]
        self.z = pos[2]

    def move(self):
        ksi = rand(1)
        s = -np.log(1-ksi)/Photon.mut

        self.x = self.x + s*Photon.k[0]
        self.y = self.y + s*Photon.k[1]
        self.z = self.z + s*Photon.k[2]       
        zPos = self.z
        return zPos 

    def exittop(self):

        newZpos = 0


    def exitbase(self):
        newZpos = 0


    def HG(self,g):
        rand_teta = rand(1)
        costeta = 0.5*(1+g**2-((1-g**2)/(1-g + 2.*g*rand_teta))**2)/g

        return costeta

    def scatter(self):
        # calculate new angle of scattering
        phi = 2*PI*rand(1)                
        costeta = self.HG(0.85)
        sinteta = (1-costeta**2)**0.5 


        sinphi = np.sin(phi) 
        cosphi = np.cos(phi)

        temp = (1-Photon.k[2]**2)**0.5

        if np.abs(temp) > EPS:        

            mux = sinteta*(Photon.k[0]*Photon.k[2]*cosphi-Photon.k[1]*sinphi)/temp + Photon.k[0]*costeta 
            muy = sinteta*(Photon.k[1]*Photon.k[2]*cosphi+Photon.k[0]*sinphi)/temp + Photon.k[1]*costeta
            muz = -sinteta*cosphi*temp + Photon.k[2]*costeta

        else:
            mux = sinteta*cosphi 
            muy = sinteta*sinphi
            if Photon.k[2]>=0:
                muz = costeta
            else:
                muz = -costeta


        # update the new direction of the photon 
        Photon.k[0] = mux
        Photon.k[1] = muy
        Photon.k[2] = muz        


class RunPhotonPackage():

    def __init__(self,L,NPHOTONS):
        self.L = L
        self.NPHOTONS = NPHOTONS

    def RunPhoton(self):
        Dist_Pos = np.zeros((3,self.NPHOTONS))
        # loop over number of photon
        for i in range(self.NPHOTONS):

            # inititate initial photon direction
            k_init = [0,0,1]
            k_init_norm = k_init/np.linalg.norm(k_init) # initial photon direction.
            # initiate new photon with initial direction   
            pos_init = [0,0,0]
            newPhoton = Photon(k_init_norm,pos_init)
            newZpos = 0.

            # while the photon is still in the layer, move it and scatter it
            while ((newZpos >= 0.) and (newZpos <= self.L)):

                newZpos = newPhoton.move()
                newscatter = newPhoton.scatter()

            Dist_Pos[0,i] = newPhoton.x
            Dist_Pos[1,i] = newPhoton.y
            Dist_Pos[2,i] = newPhoton.z


        return Dist_Pos

我运行下面的序列代码记录了不同层厚长度和给定光子数的位置直方图。

import time
tic = time.time()
dictresult = {}
for L in np.arange(10,100,10):
    print('L={0} m'.format(L))
    Dist_Pos = RunPhotonPackage(L,10000).RunPhoton()
    dictresult['{0}'.format(L)]=Dist_Pos
toc = time.time()
print('sec Elapsed: {0}s'.format(toc-tic))

然后这个 运行 在:

sec Elapsed: 26.425330162s

当我尝试使用 ipyparallel 并行化代码时:

import ipyparallel
clients = ipyparallel.Client()
clients.ids
dview = clients[:]

dview.execute('import numpy as np')
dview.execute('from numpy.random import random as rand')
dview['PI'] = np.pi
dview['EPS']= 1.e-6

dview.push({"Photon": Photon, "RunPhotonPackage": RunPhotonPackage})

def RunPhotonPara(L):
    LayerL = RunPhotonPackage(L,10000)
    dPos = LayerL.RunPhoton()
    return dPos

tic = time.time()
dictresultpara = []
for L in np.arange(10,100,10):
    print('L={0}'.format(L))
    value = dview.apply_async(RunPhotonPara,L)
    dictresultpara.append(value)
    clients.wait(dictresultpara)
toc = time.time()
print('sec Elapsed: {0}s'.format(toc-tic))

它 运行 在:

sec Elapsed: 55.4289810658s

所以时间增加了一倍多!!!我在 ubuntu 32 位四核上 运行 宁此,并使用 ipcluster start -n 4 在本地主机上启动一个控制器和 4 个引擎。我原以为并行化代码 运行 的时间是 运行 串行代码所用时间的 ~1/4,但显然事实并非如此。

这是为什么以及如何纠正它?

感谢您的任何建议。

格雷格

我做了一些更改以简化您的示例。在我的 Mac 上,串行版本 运行s 在 ~18 秒内,而具有 4 个引擎的并行版本 运行s 在大约一半的时间内。鉴于任务的持续时间不均匀,这似乎是合理的。

按照之前的设置方式,引擎中会出现错误,因此速度较快 returns。似乎通过字典传递 类 是不够的。相反,代码现在导入在每个引擎上定义 类 的模块。 请注意,我只是针对此示例 sys.path 进行了修改 ,但大概在生产环境中您会适当地处理它。

我认为您不需要在循环中使用 "wait"。此外,async_map() 方法似乎比 async_apply().

更方便

到运行这个,创建一个目录,将下面的代码复制到该目录下一个名为"photon.py"的文件中,并创建一个空的“init .py”也在那里。修改代码中插入 sys.path 的行以引用您的新目录。在那里更改目录 运行 "python photon.py":

# photon.py

import ipyparallel
import numpy as np
from numpy.random import random as rand
import time

NPHOTONS = 100000 # Nb photons
PI  = np.pi
EPS = 1.e-6
L = 100. # scattering layer thickness

class Photon():

    mut = 0.02 
    k = [0,0,1]

    def __init__(self,ko,pos):
        Photon.k = ko
        self.x = pos[0]
        self.y = pos[1]
        self.z = pos[2]

    def move(self):
        ksi = rand(1)
        s = -np.log(1-ksi)/Photon.mut

        self.x = self.x + s*Photon.k[0]
        self.y = self.y + s*Photon.k[1]
        self.z = self.z + s*Photon.k[2]       
        zPos = self.z
        return zPos 

    def exittop(self):

        newZpos = 0


    def exitbase(self):
        newZpos = 0


    def HG(self,g):
        rand_teta = rand(1)
        costeta = 0.5*(1+g**2-((1-g**2)/(1-g + 2.*g*rand_teta))**2)/g

        return costeta

    def scatter(self):
        # calculate new angle of scattering
        phi = 2*PI*rand(1)                
        costeta = self.HG(0.85)
        sinteta = (1-costeta**2)**0.5 


        sinphi = np.sin(phi) 
        cosphi = np.cos(phi)

        temp = (1-Photon.k[2]**2)**0.5

        if np.abs(temp) > EPS:        

            mux = sinteta*(Photon.k[0]*Photon.k[2]*cosphi-Photon.k[1]*sinphi)/temp + Photon.k[0]*costeta 
            muy = sinteta*(Photon.k[1]*Photon.k[2]*cosphi+Photon.k[0]*sinphi)/temp + Photon.k[1]*costeta
            muz = -sinteta*cosphi*temp + Photon.k[2]*costeta

        else:
            mux = sinteta*cosphi 
            muy = sinteta*sinphi
            if Photon.k[2]>=0:
                muz = costeta
            else:
                muz = -costeta


        # update the new direction of the photon 
        Photon.k[0] = mux
        Photon.k[1] = muy
        Photon.k[2] = muz        


class RunPhotonPackage():

    def __init__(self,L,NPHOTONS):
        self.L = L
        self.NPHOTONS = NPHOTONS

    def RunPhoton(self):
        Dist_Pos = np.zeros((3,self.NPHOTONS))
        # loop over number of photon
        for i in range(self.NPHOTONS):

            # inititate initial photon direction
            k_init = [0,0,1]
            k_init_norm = k_init/np.linalg.norm(k_init) # initial photon direction.
            # initiate new photon with initial direction   
            pos_init = [0,0,0]
            newPhoton = Photon(k_init_norm,pos_init)
            newZpos = 0.

            # while the photon is still in the layer, move it and scatter it
            while ((newZpos >= 0.) and (newZpos <= self.L)):

                newZpos = newPhoton.move()
                newscatter = newPhoton.scatter()

            Dist_Pos[0,i] = newPhoton.x
            Dist_Pos[1,i] = newPhoton.y
            Dist_Pos[2,i] = newPhoton.z

        return Dist_Pos

def RunPhoton(L):
    print('L={0}'.format(L))
    return RunPhotonPackage(L, 10000).RunPhoton()

def serialTest(values):
    print "Running serially..."
    tic = time.time()
    results = map(RunPhoton, values)
    print results
    toc = time.time()
    print('sec Elapsed: {0}s'.format(toc-tic))

def parallelTest(values):
    print "Running in parallel..."
    client = ipyparallel.Client()
    view = client[:]

    view.execute('import sys')

    # CHANGE THIS PATH TO REFER TO WHEREVER YOU PUT THIS CODE
    view.execute('sys.path.insert(0, "/Users/rjp/ipp")')
    view.execute('from photon import *')

    tic = time.time()
    asyncResults = view.map_async(RunPhoton, values)
    print asyncResults.get()
    toc = time.time()
    print('sec Elapsed: {0}s'.format(toc-tic))    


if __name__ == "__main__":
    values = np.arange(10, 100, 10)

    serialTest(values)
    parallelTest(values)