Python multiprocessing - 跟踪 pool.map 操作的进程

Python multiprocessing - tracking the process of pool.map operation

我有一个函数可以执行一些模拟和 returns 字符串格式的数组。

我想运行模拟(功能) 不同的输入参数值,超过 10000 个可能的输入值, 并将结果写入单个文件。

我正在使用多处理,特别是 pool.map 函数 运行 并行模拟。

自运行整个过程模拟功能超过10000次 耗时很长,很想跟踪整个操作过程。

我认为我当前下面的代码中的问题是,pool.map 运行 函数执行了 10000 次,在这些操作期间没有任何进程跟踪。一旦并行处理完成 运行ning 10000 次模拟(可能是几小时到几天。),然后我会继续跟踪 10000 次模拟结果何时被保存到文件中。所以这并不是真正跟踪 pool.map 操作。

我的代码是否有允许进程跟踪的简单修复程序?

def simFunction(input):
    # Does some simulation and outputs simResult
    return str(simResult)

# Parallel processing

inputs = np.arange(0,10000,1)

if __name__ == "__main__":
    numCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes = numCores)
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out:
        print("Starting to simulate " + str(len(inputs)) + " input values...")
        counter = 0
        for i in t:
            out.write(i + '\n')
            counter = counter + 1
            if counter%100==0:
                print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
    print('Finished!!!!')

没有"easy fix"。 map 就是对你隐藏实施细节。在这种情况下,您 需要 详细信息。也就是说,根据定义,事情变得有点复杂。你需要改变沟通范式。有很多方法可以做到这一点。

一个是:创建一个队列来收集你的结果,让你的工作人员把结果放入这个队列中。然后,您可以从监视线程或进程中查看队列,并在结果进入时使用它们。在使用时,您可以分析它们并生成日志输出。这可能是跟踪进度的最通用方式:您可以以任何方式实时响应传入的结果。

一种更简单的方法可能是稍微修改您的辅助函数,并在其中生成日志输出。通过使用外部工具(例如 grepwc)仔细分析日志输出,您可以想出非常简单的跟踪方法。

请注意,我使用的是 pathos.multiprocessing 而不是 multiprocessing 它只是 multiprocessing 的一个分支,可以让您执行 map 具有多个输入的函数,具有更好的序列化,并允许您在任何地方执行 map 调用(不仅仅是在 __main__ 中)。您也可以使用 multiprocessing 来执行以下操作,但是代码会略有不同。

如果您使用迭代 map 函数,跟踪进度非常容易。

from pathos.multiprocessing import ProcessingPool as Pool
def simFunction(x,y):
  import time
  time.sleep(2)
  return x**2 + y
 
x,y = range(100),range(-100,100,2)
res = Pool().imap(simFunction, x,y)
with open('results.txt', 'w') as out:
  for i in x:
    out.write("%s\n" % res.next())
    if i%10 is 0:
      print "%s of %s simulated" % (i, len(x))
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated

或者,您可以使用异步 map。在这里我会做一些不同的事情,只是为了混合起来。

import time
res = Pool().amap(simFunction, x,y)
while not res.ready():
  print "waiting..."
  time.sleep(5)
 
waiting...
waiting...
waiting...
waiting...
res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]

无论是迭代还是异步 map 都可以让您编写任何代码来更好地跟踪进程。例如,将唯一的“id”传递给每个作业,然后观察哪个返回,或者让每个作业 return 它是进程 id。有很多方法可以跟踪进度和过程……但以上内容应该可以给您一个开始。

你可以获得pathoshere.

我想你需要的是一个日志文件

我建议您使用 logging 模块,它是 Python 标准库的一部分。但不幸的是 logging 不是多处理安全的。所以你不能在你的应用程序中开箱即用。

因此,您将需要使用多处理安全日志处理程序或使用队列或锁以及 logging 模块来实现您的日志处理程序。

在 Whosebug 中对此有很多讨论。例如:How should I log while using multiprocessing in Python?

如果大部分 CPU 负载在模拟函数中并且您不打算使用日志轮换,您可能可以使用像这样的简单锁定机制:

import multiprocessing
import logging

from random import random
import time


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(process)s %(levelname)s %(message)s',
    filename='results.log',
    filemode='a'
)


def simulation(a):
    # logging
    with multiprocessing.Lock():
        logging.debug("Simulating with %s" % a)

    # simulation
    time.sleep(random())
    result = a*2

    # logging
    with multiprocessing.Lock():
        logging.debug("Finished simulation with %s. Result is %s" % (a, result))

    return result

if __name__ == '__main__':

    logging.debug("Starting the simulation")
    inputs = [x for x in xrange(100)]
    num_cores = multiprocessing.cpu_count()
    print "num_cores: %d" % num_cores
    pool = multiprocessing.Pool(processes=num_cores)
    t = pool.map(simulation, inputs)
    logging.debug("The simulation has ended")

您可以在 运行 时 "tail -f" 您的日志文件。这是你应该看到的:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5

试过 Windows 和 Linux。

希望对您有所帮助