如何在 Monte Carlo 集成中实现多处理

How to implement multiprocessing in Monte Carlo integration

我创建了一个 Python 程序,该程序使用 Monte Carlo 模拟在给定时间间隔内集成给定函数。它工作得很好,除了当你想要更高的准确度(更大的 N 值)时它运行得非常慢。我想我会尝试多处理以加快速度,但后来我意识到我不知道如何实现它。这是我现在拥有的:

from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Process
import os

# GOAL: Approximate the integral of a function f(x) from lower bound a to upper bound b using Monte Carlo simulation

# bounds of integration
a = 0
b = np.pi

# function to integrate
def f(x):
    return np.sin(x)

N = 10000
areas = []


def mcIntegrate():
    
    for i in range(N):
        
        # array filled with random numbers between limits
        xrand = random.uniform(a, b, N)
        
        # sum the return values of the function of each random number
        integral = 0.0
        for i in range(N):
            integral += f(xrand[i])
        
        # scale integral by difference of bounds divided by amount of random values
        ans = integral * ((b - a) / float(N))
        
        # add approximation to list of other approximations
        areas.append(ans)


if __name__ == "__main__":

    processes = []
    numProcesses = os.cpu_count()

    for i in range(numProcesses):
        process = Process(target=mcIntegrate)
        processes.append(process)

    for process in processes:
        process.start()

    for process in processes:
        process.start()

    # graph approximation distribution
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec='black')
    plt.xlabel("Areas")
    plt.show()

我可以就此实施获得一些帮助吗?


听取了评论的建议并使用了 multiprocessor.Pool,还通过使用 NumPy 减少了一些操作。从大约需要 5 分钟到 运行 现在大约需要 6 秒(对于 N = 10000)。这是我的实现:

import scipy
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing
import os

# GOAL: Approximate the integral of function f from lower bound a to upper bound b using Monte Carlo simulation

a = 0       # lower bound of integration
b = np.pi   # upper bound of integration
f = np.sin  # function to integrate
N = 10000   # sample size

def mcIntegrate(p):
    xrand = scipy.random.uniform(a, b, N)       # create array filled with random numbers within bounds
    integral = np.sum(f(xrand))                 # sum return values of function of each random number
    approx = integral * ((b - a) / float(N))    # scale integral by difference of bounds divided by sample size
    return approx


if __name__ == "__main__":
    
    # run simulation N times in parallel and store results in array
    with multiprocessing.Pool(os.cpu_count()) as pool:
        areas = pool.map(mcIntegrate, range(N))

    # graph approximation distribution
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec='black')
    plt.xlabel("Areas")
    plt.show()

事实证明这是一个比我在优化它时想象的更有趣的问题。基本方法很简单:

from multiprocessing import pool

def f(x):
    return x

results = pool.map(f, range(100))

这是适合多处理的 mcIntegerate

from tqdm import tqdm

def mcIntegrate(steps):
    tasks = []

    print("Setting up simulations")

    # linear
    for _ in tqdm(range(steps)):
        xrand = random.uniform(a, b, steps)
        for i in range(steps):
            tasks.append(xrand[i])

    pool = Pool(cpu_count())

    print("Simulating (no progress)")
    results = pool.map(f, tasks)
    pool.close()

    print("summing")
    areas = []
    for chunk in tqdm(range(steps)):
        vals = results[chunk * steps : (chunk + 1) * steps]
        integral = sum(vals)
        ans = integral * ((b - a) / float(steps))
        areas.append(ans)

    return areas

tqdm只是用来显示一个进度条。

这是多处理的基本工作流程:将问题分解为任务,解决所有任务,然后将它们重新组合在一起。确实,给定的代码有效。 (请注意,我已将您的 N 更改为 steps)。

为了完整起见,脚本现在开始:

from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from tqdm import tqdm

# function to integrate
def f(x):
    return np.sin(x)

结束

areas = mcIntegrate(3_000)

a = 0
b = np.pi

plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec="black")
plt.xlabel("Areas")
plt.show()

优化

我故意将问题分解为尽可能小的级别。这是个好主意吗?要回答这个问题,请考虑:我们如何优化生成任务的线性过程?目前这确实需要相当长的时间。我们可以将它并行化:

def _prepare(steps):
    xrand = random.uniform(a, b, steps)
    return [xrand[i] for i in range(steps)]

def mcIntegrate(steps):
    ...
    tasks = []
    for res in tqdm(pool.imap(_prepare, (steps for _ in range(steps))), total=steps):
        tasks += res  # slower except for very large steps

这里我使用了 pool.imap,其中 returns 是一个迭代器,我们可以在结果可用时立即对其进行迭代,从而允许我们构建进度条。如果您这样做并进行比较,您会发现它比线性解决方案运行 。删除进度条(在我的机器上)并替换为:

    import time

    start = time.perf_counter()
    results = pool.map(_prepare, (steps for _ in range(steps)))
    tasks = []
    for res in results:
        tasks += res
    print(time.perf_counter() - start)

只是稍微快一点:它 仍然 比 运行 线性慢。将数据序列化到进程然后反序列化它会产生开销。如果你试图在整个事情上获得一个进度条,它会变得极其缓慢:

    results = []
    for result in tqdm(pool.imap(f, tasks), total=len(tasks)):
        results.append(result)

那么在更高层次上迭代呢?这是对您的 mcIterate:

的另一种改编
a = 0
b = np.pi

def _mcIntegrate(steps):
    xrand = random.uniform(a, b, steps)
    integral = 0.0
    for i in range(steps):
        integral += f(xrand[i])
    ans = integral * ((b - a) / float(steps))

    return ans


def mcIntegrate(steps):
    areas = []
    p = Pool(cpu_count())
    for ans in tqdm(p.imap(_mcIntegrate, ((steps) for _ in range(steps))), total=steps):
        areas.append(ans)

    return areas

这在 我的 机器上要快得多。它也简单得多。我原以为会有所不同,但并没有那么大的不同。

外卖

多重处理不是免费的。像 np.sin() 这样简单的东西 对多进程来说太便宜 了:我们为序列化、反序列化、附加等支付费用,所有这些都是为了一次 sin() 计算。但是如果你做太多许多计算,你会浪费时间,因为你失去了粒度。这里的效果比我预期的更显着。了解特定问题的正确粒度级别的唯一方法是分析和尝试。

我的经验是,多处理通常效率不高(大量开销)。将代码推送到 numpy 中的次数越多,它就会越快,但有一个警告;如果你不小心,你的内存可能会过载(10k x 10k 越来越大)。最后,看起来 N 正在执行双重任务,既定义了每个估计的样本量,又作为试验估计的数量。

以下是我将如何执行此操作(稍作样式更改):

import numpy as np
f = np.sin
a = 0
b = np.pi

# number samples for each trial, trial count, and number calculated at once
N = 10000
TRIALS = 10000
BATCH_SIZE=1000

def mc_integrate(f, a, b, N, batch_size=BATCH_SIZE):
    # compute everything carrying `batch_size` copies by extending the array dimension.

    # samples.shape == (N, batch_size)
    samples = np.random.uniform(a, b, size=(N, batch_size))

    integrals = np.sum(f(samples), axis=0)
    mc_estimates = integrals * ((b - a) / N)    
    return mc_estimates


# loop over batch values to get final result
n, r = divmod(TRIALS, BATCH_SIZE)
results = []
for j in [BATCH_SIZE]*n + [r]:
    results.extend(mc_integrate(f, a, b, N, batch_size=j))

在我的机器上这需要几秒钟。