Python Apply_async 不等待其他进程完成

Python Apply_async not waiting for other Processes to Finish

我有以下示例代码,我正在尝试在其上使用多处理模块。以下语句之前在其他应用程序下一直有效,但是一个进程(由于中断而接收到非常少量的数据)首先完成并导致程序完成。有人可以帮我理解为什么这不等待其他人吗?

def mpProcessor(basePath, jsonData, num_procs = mp.cpu_count()):
manager = mp.Manager()
map = manager.dict()
procs = mp.Pool(processes = num_procs, maxtasksperchild = 1)
chunkSize = len(jsonData) / (num_procs)
dataChunk = [(i, i + chunkSize) for i in range(0, len(jsonData), chunkSize)]
count = 1
for i in dataChunk:
    print 'test'
    s, e = i
    procs.apply_async(processJSON, args = (count, basePath, jsonData[s:e]))
    count += 1
procs.close()
procs.join()
return map

def processJSON(proc, basePath, records):
print 'Spawning new process: %d' %os.getpid()
outDict = dict()
print len(records)
for i in range(len(records)):
    valid = False
    idx = 0
    while valid == False:
        jsonObject = json.loads(records[i][1])['results'][idx]
        if jsonObject['kind'] == 'song':
            valid = True
            break
        else:
            idx += 1
    tunesTrack = Track()
    tunesTrack.setTrackId(jsonObject['trackId'])
print 'Finished processing %d records with process %d' %(len(records), os.getpid())

你好像在重新发明轮子。

通过对池使用初始化程序并使用 map 而不是 apply_async,可以更轻松地实现您正在尝试做的事情。就目前而言,您的代码片段不可运行,因此我无法确定实际问题是什么。但是,以下应该会简化您的代码并使其更易于调试。

import math
import multiprocessing as mp

def pool_init(basePath_):
    global basePath, job_count
    basePath = basePath_
    job_count = 0
    print 'Spawning new process: %d' %os.getpid()

def mpProcessor(basePath, jsonData, num_procs=mp.cpu_count()):
    pool = mp.Pool(processes=num_procs, initializer=pool_init, initargs=(basePath,))
    # could specify a chunksize, but multiprocessing works out the optimal chunksize
    return pool.map(processJSON, jsonData)

# change processJSON to work with single records and
# remove proc and basePath args (as not needed)
def processJSON(record):
    global job_count
    print 'Starting job %d in process: %d' % (job_count, os.getpid())
    valid = False
    idx = 0
    while valid == False:
        jsonObject = json.loads(record[1])['results'][idx]
        if jsonObject['kind'] == 'song':
            valid = True
            break
        else:
            idx += 1
    tunesTrack = Track()
    tunesTrack.setTrackId(jsonObject['trackId'])
    print 'Finished processing job %d with process %d' % (job_count,  os.getpid())
    job_count += 1