如何在 Monte Carlo 集成中实现多处理
How to implement multiprocessing in Monte Carlo integration
我创建了一个 Python 程序,该程序使用 Monte Carlo 模拟在给定时间间隔内集成给定函数。它工作得很好,除了当你想要更高的准确度(更大的 N 值)时它运行得非常慢。我想我会尝试多处理以加快速度,但后来我意识到我不知道如何实现它。这是我现在拥有的:
from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Process
import os
# GOAL: Approximate the integral of a function f(x) from lower bound a to upper bound b using Monte Carlo simulation
# bounds of integration
a = 0
b = np.pi
# function to integrate
def f(x):
return np.sin(x)
N = 10000
areas = []
def mcIntegrate():
for i in range(N):
# array filled with random numbers between limits
xrand = random.uniform(a, b, N)
# sum the return values of the function of each random number
integral = 0.0
for i in range(N):
integral += f(xrand[i])
# scale integral by difference of bounds divided by amount of random values
ans = integral * ((b - a) / float(N))
# add approximation to list of other approximations
areas.append(ans)
if __name__ == "__main__":
processes = []
numProcesses = os.cpu_count()
for i in range(numProcesses):
process = Process(target=mcIntegrate)
processes.append(process)
for process in processes:
process.start()
for process in processes:
process.start()
# graph approximation distribution
plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec='black')
plt.xlabel("Areas")
plt.show()
我可以就此实施获得一些帮助吗?
听取了评论的建议并使用了 multiprocessor.Pool,还通过使用 NumPy 减少了一些操作。从大约需要 5 分钟到 运行 现在大约需要 6 秒(对于 N = 10000)。这是我的实现:
import scipy
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing
import os
# GOAL: Approximate the integral of function f from lower bound a to upper bound b using Monte Carlo simulation
a = 0 # lower bound of integration
b = np.pi # upper bound of integration
f = np.sin # function to integrate
N = 10000 # sample size
def mcIntegrate(p):
xrand = scipy.random.uniform(a, b, N) # create array filled with random numbers within bounds
integral = np.sum(f(xrand)) # sum return values of function of each random number
approx = integral * ((b - a) / float(N)) # scale integral by difference of bounds divided by sample size
return approx
if __name__ == "__main__":
# run simulation N times in parallel and store results in array
with multiprocessing.Pool(os.cpu_count()) as pool:
areas = pool.map(mcIntegrate, range(N))
# graph approximation distribution
plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec='black')
plt.xlabel("Areas")
plt.show()
事实证明这是一个比我在优化它时想象的更有趣的问题。基本方法很简单:
from multiprocessing import pool
def f(x):
return x
results = pool.map(f, range(100))
这是适合多处理的 mcIntegerate
:
from tqdm import tqdm
def mcIntegrate(steps):
tasks = []
print("Setting up simulations")
# linear
for _ in tqdm(range(steps)):
xrand = random.uniform(a, b, steps)
for i in range(steps):
tasks.append(xrand[i])
pool = Pool(cpu_count())
print("Simulating (no progress)")
results = pool.map(f, tasks)
pool.close()
print("summing")
areas = []
for chunk in tqdm(range(steps)):
vals = results[chunk * steps : (chunk + 1) * steps]
integral = sum(vals)
ans = integral * ((b - a) / float(steps))
areas.append(ans)
return areas
tqdm
只是用来显示一个进度条。
这是多处理的基本工作流程:将问题分解为任务,解决所有任务,然后将它们重新组合在一起。确实,给定的代码有效。 (请注意,我已将您的 N
更改为 steps
)。
为了完整起见,脚本现在开始:
from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
# function to integrate
def f(x):
return np.sin(x)
结束
areas = mcIntegrate(3_000)
a = 0
b = np.pi
plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec="black")
plt.xlabel("Areas")
plt.show()
优化
我故意将问题分解为尽可能小的级别。这是个好主意吗?要回答这个问题,请考虑:我们如何优化生成任务的线性过程?目前这确实需要相当长的时间。我们可以将它并行化:
def _prepare(steps):
xrand = random.uniform(a, b, steps)
return [xrand[i] for i in range(steps)]
def mcIntegrate(steps):
...
tasks = []
for res in tqdm(pool.imap(_prepare, (steps for _ in range(steps))), total=steps):
tasks += res # slower except for very large steps
这里我使用了 pool.imap
,其中 returns 是一个迭代器,我们可以在结果可用时立即对其进行迭代,从而允许我们构建进度条。如果您这样做并进行比较,您会发现它比线性解决方案运行 慢 。删除进度条(在我的机器上)并替换为:
import time
start = time.perf_counter()
results = pool.map(_prepare, (steps for _ in range(steps)))
tasks = []
for res in results:
tasks += res
print(time.perf_counter() - start)
只是稍微快一点:它 仍然 比 运行 线性慢。将数据序列化到进程然后反序列化它会产生开销。如果你试图在整个事情上获得一个进度条,它会变得极其缓慢:
results = []
for result in tqdm(pool.imap(f, tasks), total=len(tasks)):
results.append(result)
那么在更高层次上迭代呢?这是对您的 mcIterate
:
的另一种改编
a = 0
b = np.pi
def _mcIntegrate(steps):
xrand = random.uniform(a, b, steps)
integral = 0.0
for i in range(steps):
integral += f(xrand[i])
ans = integral * ((b - a) / float(steps))
return ans
def mcIntegrate(steps):
areas = []
p = Pool(cpu_count())
for ans in tqdm(p.imap(_mcIntegrate, ((steps) for _ in range(steps))), total=steps):
areas.append(ans)
return areas
这在 我的 机器上要快得多。它也简单得多。我原以为会有所不同,但并没有那么大的不同。
外卖
多重处理不是免费的。像 np.sin()
这样简单的东西 对多进程来说太便宜 了:我们为序列化、反序列化、附加等支付费用,所有这些都是为了一次 sin()
计算。但是如果你做太多许多计算,你会浪费时间,因为你失去了粒度。这里的效果比我预期的更显着。了解特定问题的正确粒度级别的唯一方法是分析和尝试。
我的经验是,多处理通常效率不高(大量开销)。将代码推送到 numpy 中的次数越多,它就会越快,但有一个警告;如果你不小心,你的内存可能会过载(10k x 10k 越来越大)。最后,看起来 N
正在执行双重任务,既定义了每个估计的样本量,又作为试验估计的数量。
以下是我将如何执行此操作(稍作样式更改):
import numpy as np
f = np.sin
a = 0
b = np.pi
# number samples for each trial, trial count, and number calculated at once
N = 10000
TRIALS = 10000
BATCH_SIZE=1000
def mc_integrate(f, a, b, N, batch_size=BATCH_SIZE):
# compute everything carrying `batch_size` copies by extending the array dimension.
# samples.shape == (N, batch_size)
samples = np.random.uniform(a, b, size=(N, batch_size))
integrals = np.sum(f(samples), axis=0)
mc_estimates = integrals * ((b - a) / N)
return mc_estimates
# loop over batch values to get final result
n, r = divmod(TRIALS, BATCH_SIZE)
results = []
for j in [BATCH_SIZE]*n + [r]:
results.extend(mc_integrate(f, a, b, N, batch_size=j))
在我的机器上这需要几秒钟。
我创建了一个 Python 程序,该程序使用 Monte Carlo 模拟在给定时间间隔内集成给定函数。它工作得很好,除了当你想要更高的准确度(更大的 N 值)时它运行得非常慢。我想我会尝试多处理以加快速度,但后来我意识到我不知道如何实现它。这是我现在拥有的:
from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Process
import os
# GOAL: Approximate the integral of a function f(x) from lower bound a to upper bound b using Monte Carlo simulation
# bounds of integration
a = 0
b = np.pi
# function to integrate
def f(x):
return np.sin(x)
N = 10000
areas = []
def mcIntegrate():
for i in range(N):
# array filled with random numbers between limits
xrand = random.uniform(a, b, N)
# sum the return values of the function of each random number
integral = 0.0
for i in range(N):
integral += f(xrand[i])
# scale integral by difference of bounds divided by amount of random values
ans = integral * ((b - a) / float(N))
# add approximation to list of other approximations
areas.append(ans)
if __name__ == "__main__":
processes = []
numProcesses = os.cpu_count()
for i in range(numProcesses):
process = Process(target=mcIntegrate)
processes.append(process)
for process in processes:
process.start()
for process in processes:
process.start()
# graph approximation distribution
plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec='black')
plt.xlabel("Areas")
plt.show()
我可以就此实施获得一些帮助吗?
听取了评论的建议并使用了 multiprocessor.Pool,还通过使用 NumPy 减少了一些操作。从大约需要 5 分钟到 运行 现在大约需要 6 秒(对于 N = 10000)。这是我的实现:
import scipy
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing
import os
# GOAL: Approximate the integral of function f from lower bound a to upper bound b using Monte Carlo simulation
a = 0 # lower bound of integration
b = np.pi # upper bound of integration
f = np.sin # function to integrate
N = 10000 # sample size
def mcIntegrate(p):
xrand = scipy.random.uniform(a, b, N) # create array filled with random numbers within bounds
integral = np.sum(f(xrand)) # sum return values of function of each random number
approx = integral * ((b - a) / float(N)) # scale integral by difference of bounds divided by sample size
return approx
if __name__ == "__main__":
# run simulation N times in parallel and store results in array
with multiprocessing.Pool(os.cpu_count()) as pool:
areas = pool.map(mcIntegrate, range(N))
# graph approximation distribution
plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec='black')
plt.xlabel("Areas")
plt.show()
事实证明这是一个比我在优化它时想象的更有趣的问题。基本方法很简单:
from multiprocessing import pool
def f(x):
return x
results = pool.map(f, range(100))
这是适合多处理的 mcIntegerate
:
from tqdm import tqdm
def mcIntegrate(steps):
tasks = []
print("Setting up simulations")
# linear
for _ in tqdm(range(steps)):
xrand = random.uniform(a, b, steps)
for i in range(steps):
tasks.append(xrand[i])
pool = Pool(cpu_count())
print("Simulating (no progress)")
results = pool.map(f, tasks)
pool.close()
print("summing")
areas = []
for chunk in tqdm(range(steps)):
vals = results[chunk * steps : (chunk + 1) * steps]
integral = sum(vals)
ans = integral * ((b - a) / float(steps))
areas.append(ans)
return areas
tqdm
只是用来显示一个进度条。
这是多处理的基本工作流程:将问题分解为任务,解决所有任务,然后将它们重新组合在一起。确实,给定的代码有效。 (请注意,我已将您的 N
更改为 steps
)。
为了完整起见,脚本现在开始:
from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
# function to integrate
def f(x):
return np.sin(x)
结束
areas = mcIntegrate(3_000)
a = 0
b = np.pi
plt.title("Distribution of Approximated Integrals")
plt.hist(areas, bins=30, ec="black")
plt.xlabel("Areas")
plt.show()
优化
我故意将问题分解为尽可能小的级别。这是个好主意吗?要回答这个问题,请考虑:我们如何优化生成任务的线性过程?目前这确实需要相当长的时间。我们可以将它并行化:
def _prepare(steps):
xrand = random.uniform(a, b, steps)
return [xrand[i] for i in range(steps)]
def mcIntegrate(steps):
...
tasks = []
for res in tqdm(pool.imap(_prepare, (steps for _ in range(steps))), total=steps):
tasks += res # slower except for very large steps
这里我使用了 pool.imap
,其中 returns 是一个迭代器,我们可以在结果可用时立即对其进行迭代,从而允许我们构建进度条。如果您这样做并进行比较,您会发现它比线性解决方案运行 慢 。删除进度条(在我的机器上)并替换为:
import time
start = time.perf_counter()
results = pool.map(_prepare, (steps for _ in range(steps)))
tasks = []
for res in results:
tasks += res
print(time.perf_counter() - start)
只是稍微快一点:它 仍然 比 运行 线性慢。将数据序列化到进程然后反序列化它会产生开销。如果你试图在整个事情上获得一个进度条,它会变得极其缓慢:
results = []
for result in tqdm(pool.imap(f, tasks), total=len(tasks)):
results.append(result)
那么在更高层次上迭代呢?这是对您的 mcIterate
:
a = 0
b = np.pi
def _mcIntegrate(steps):
xrand = random.uniform(a, b, steps)
integral = 0.0
for i in range(steps):
integral += f(xrand[i])
ans = integral * ((b - a) / float(steps))
return ans
def mcIntegrate(steps):
areas = []
p = Pool(cpu_count())
for ans in tqdm(p.imap(_mcIntegrate, ((steps) for _ in range(steps))), total=steps):
areas.append(ans)
return areas
这在 我的 机器上要快得多。它也简单得多。我原以为会有所不同,但并没有那么大的不同。
外卖
多重处理不是免费的。像 np.sin()
这样简单的东西 对多进程来说太便宜 了:我们为序列化、反序列化、附加等支付费用,所有这些都是为了一次 sin()
计算。但是如果你做太多许多计算,你会浪费时间,因为你失去了粒度。这里的效果比我预期的更显着。了解特定问题的正确粒度级别的唯一方法是分析和尝试。
我的经验是,多处理通常效率不高(大量开销)。将代码推送到 numpy 中的次数越多,它就会越快,但有一个警告;如果你不小心,你的内存可能会过载(10k x 10k 越来越大)。最后,看起来 N
正在执行双重任务,既定义了每个估计的样本量,又作为试验估计的数量。
以下是我将如何执行此操作(稍作样式更改):
import numpy as np
f = np.sin
a = 0
b = np.pi
# number samples for each trial, trial count, and number calculated at once
N = 10000
TRIALS = 10000
BATCH_SIZE=1000
def mc_integrate(f, a, b, N, batch_size=BATCH_SIZE):
# compute everything carrying `batch_size` copies by extending the array dimension.
# samples.shape == (N, batch_size)
samples = np.random.uniform(a, b, size=(N, batch_size))
integrals = np.sum(f(samples), axis=0)
mc_estimates = integrals * ((b - a) / N)
return mc_estimates
# loop over batch values to get final result
n, r = divmod(TRIALS, BATCH_SIZE)
results = []
for j in [BATCH_SIZE]*n + [r]:
results.extend(mc_integrate(f, a, b, N, batch_size=j))
在我的机器上这需要几秒钟。