如何使带有两个 for 循环 运行 的 python 代码更快(是否有 python 执行 Mathematica 并行化的方法)?

How to make the python code with two for loop run faster(Is there a python way of doing Mathematica's Parallelize)?

我完全不熟悉 python 或任何此类编程语言。我对 Mathematica 有一些经验。我有一个数学问题,尽管 Mathematica 使用她自己的 'Parallelize' 方法解决了这个问题,但在使用了所有核心后系统却非常疲惫!在 运行 期间我几乎无法使用机器。因此,我一直在寻找一些编码替代方案,并发现 python 一种易于学习和实施的方法。因此,事不宜迟,让我告诉您我的 python 代码的数学问题和问题。由于完整代码太长,我给个大纲吧。

1. Numericall 求解形式为 y''(t) + f(t)y(t)=0 的微分方程,以获得某个范围内的 y(t),例如 C <= t <= D

2.Next,将数值结果插值到某个所需的范围内,得到函数:w(t),比如 A <= t <= B

3。使用 w(t) 求解另一个形式为 z''(t) + [ a + b W(t)] z(t) =0 的微分方程,对于 a 和 b 的某个范围,我正在使用循环.

4. Deine F = 1 + sol1[157],生成一个类似 {a, b, F} 的列表。所以让我给出一个原型循环,因为这占用了大部分计算时间。

for q in np.linspace(0.0, 4.0, 100):
    for a in np.linspace(-2.0, 7.0, 100):
        print('Solving for q = {}, a = {}'.format(q,a))
        sol1 = odeint(fun, [1, 0], t, args=( a, q))[..., 0]
        print(t[157])
        F = 1 + sol1[157]                    
        f1.write("{}  {} {} \n".format(q, a, F))            
    f1.close()

现在,真正的循环大约需要 4 小时 30 分钟才能完成(对于 w(t) 的某些内置函数形式,大约需要 2 分钟)。什么时候,我在我的代码中定义 fun 之前应用(没有正确理解它的作用和方式!)numba/autojit, 运行 时间显着缩短,大约需要 2 小时 30 分钟。另外,写两个循环 itertools/product 进一步减少 运行 时间仅约 2 分钟!然而,当我让 Mathematica 使用所有 4 个核心时,Mathematica 在 30 分钟内完成了任务。

那么,有没有办法改善 python 中的 运行 时间?

要加快 python,您有三个选择:

  • 处理程序中的特定瓶颈(如@LutzL 的评论中所建议)
  • 尝试使用 cython (or including C code using weave 或类似技术将代码编译成 C 来加速代码)。由于您的情况下耗时的计算不在 python 代码中,而是在 scipy 模块中(至少我相信它们是),这在这里对您没有太大帮助。
  • 按照您在原始问题中的建议实施 multiprocessing。如果您有 X 个内核,这将使您的代码速度提高最多 X(略小于)倍。不幸的是,这在 python.
  • 中相当复杂

实施多处理 - 使用原始问题中的原型循环的示例

我假设您在原型代码的嵌套循环内进行的计算实际上是相互独立的。由于您的原型代码不完整,因此我不确定情况是否如此。否则,它当然不会起作用。我将举一个例子,不是你的 fun 函数的微分方程问题,而是相同签名(输入和输出变量)的原型。

import numpy as np
import scipy.integrate
import multiprocessing as mp

def fun(y, t, b, c):
    # replace this function with whatever function you want to work with
    #    (this one is the example function from the scipy docs for odeint)
    theta, omega = y
    dydt = [omega, -b*omega - c*np.sin(theta)]
    return dydt

#definitions of work thread and write thread functions

def run_thread(input_queue, output_queue):
    # run threads will pull tasks from the input_queue, push results into output_queue
    while True:
        try:
            queueitem = input_queue.get(block = False)
            if len(queueitem) == 3:
                a, q, t = queueitem
                sol1 = scipy.integrate.odeint(fun, [1, 0], t, args=( a, q))[..., 0]
                F = 1 + sol1[157]
                output_queue.put((q, a, F))
        except Exception as e:
            print(str(e))
            print("Queue exhausted, terminating")
            break

def write_thread(queue):    
    # write thread will pull results from output_queue, write them to outputfile.txt
    f1 = open("outputfile.txt", "w")
    while True:
        try:
            queueitem = queue.get(block = False)
            if queueitem[0] == "TERMINATE":
                f1.close()
                break
            else:
                q, a, F = queueitem                
                print("{}  {} {} \n".format(q, a, F))            
                f1.write("{}  {} {} \n".format(q, a, F))            
        except:
            # necessary since it will throw an error whenever output_queue is empty
            pass

# define time point sequence            
t = np.linspace(0, 10, 201)

# prepare input and output Queues
mpM = mp.Manager()
input_queue = mpM.Queue()
output_queue = mpM.Queue()

# prepare tasks, collect them in input_queue
for q in np.linspace(0.0, 4.0, 100):
    for a in np.linspace(-2.0, 7.0, 100):
        # Your computations as commented here will now happen in run_threads as defined above and created below
        # print('Solving for q = {}, a = {}'.format(q,a))
        # sol1 = scipy.integrate.odeint(fun, [1, 0], t, args=( a, q))[..., 0]
        # print(t[157])
        # F = 1 + sol1[157]    
        input_tupel = (a, q, t)
        input_queue.put(input_tupel)

# create threads
thread_number = mp.cpu_count()
procs_list = [mp.Process(target = run_thread , args = (input_queue, output_queue)) for i in range(thread_number)]         
write_proc = mp.Process(target = write_thread, args = (output_queue,))

# start threads
for proc in procs_list:
    proc.start()
write_proc.start()

# wait for run_threads to finish
for proc in procs_list:
    proc.join()

# terminate write_thread
output_queue.put(("TERMINATE",))
write_proc.join()

说明

  • 我们在开始计算之前定义了各个问题(或者更确切地说是它们的参数);我们将它们收集在输入队列中。
  • 我们在线程中定义了一个函数(run_thread),即运行。此函数计算单个问题,直到输入队列中还剩下 none;它将结果推送到输出队列中。
  • 我们启动与 CPU 一样多的此类线程。
  • 我们启动一个额外的线程 (write_thread) 以从输出队列收集结果并将它们写入文件。

注意事项

  • 对于较小的问题,您可以 运行 不使用队列进行多处理。但是,如果单个计算的数量很大,您将超过内核允许的最大线程数,然后内核会终止您的程序。
  • 不同操作系统之间的多处理工作方式存在差异。上面的示例将在 Linux(可能也适用于其他类似 Unix 的系统,例如 Mac 和 BSD)、 上运行。原因是 Windows 没有 fork() 系统调用。 (我无权访问 Windows,因此无法尝试为 Windows 实施它。)