python 多处理 .get() 永远不会结束

python multiprocessing .get() never ends

我来这里是因为我的 python 脚本中存在多处理问题。 我的代码是:

def filter_list_of_list_values(myList,myFilterList):
    for index in range(len(myList)):
        print(index)
        sub_array = myList[index]
        for stopword in myFilterList :
            sub_array = list(filter(lambda a: a != stopword, sub_array))
        sub_array = [w for w in sub_array if not w in myFilterList]
        myList[index] = sub_array
    return myList  

import multiprocessing
import numpy as np
#We are going to execute a multiprocessing and split the list in as many parts than processors used :
N_PROCS = 6
#Lists splitting : 
L_sub_lists  = np.array_split(tokenized_text, N_PROCS)


final_List = []
start_time = time.time()


print('Creating pool with %d processes\n' % N_PROCS)
with multiprocessing.Pool(N_PROCS) as pool:
    #We initialize a list of tasks which each call the same function, but
    #with a diffrent list
    TASKS = [(sub_list, english_stopwords) for sub_list in L_sub_lists]
    print("TASK OK")

    results = [pool.apply_async(filter_list_of_list_values, t) for t in TASKS]
    print("results OK")
    
    final_results = [r.get() for r in results]
    print("final_results OK")

    for sub_list_res in final_results:
        print("appending")
        final_List+= sub_list_res
        print("list_append")
    
print("--- %s seconds ---" % (time.time() - start_time))

脚本卡在:

final_results = [r.get() for r in results]

我真的不明白为什么,因为我使用了相同的脚本(有一些小的差异)和其他上下文(不同的函数并应用于 DataFrame 而不是列表列表)并且一切都运行良好

一个例子:

L = [['Paris', 'Monaco', 'Washington', 'Lyon', 'Venise', 'Marseille'],
 ['New-York', 'NapleWashington', 'Turin', 'Chicago', 'Las Vegas'],
 ['Lyon', 'Rome', 'Chicago', 'Venise', 'Naple', 'Turin']]

filter_list_of_list_values(L,['Lyon','Turin','Chicago'])

将导致:

[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille'], ['New-York', 'NapleWashington', 'Las Vegas'], ['Rome', 'Venise', 'Naple']]

所以,我试着看了一下,看起来你的代码几乎是正确的。你错过了你实际上是如何形成你的输入的。您的示例代码失败,因为您没有导入时间,也没有定义 tokenized_text,而且我不知道输入实际上应该是什么。但!根据您的示例,您的代码确实有效,所以我怀疑您为形成输入所做的一切都是不正确的

这是您的代码的基本功能版本

import time
import multiprocessing
import numpy as np

N_PROCS = 6
# L_sub_lists = np.array_split(tokenized_text, N_PROCS)

final_List = []
start_time = time.time()

L = [['Paris', 'Monaco', 'Washington', 'Lyon', 'Venise', 'Marseille'],
     ['New-York', 'NapleWashington', 'Turin', 'Chicago', 'Las Vegas'],
     ['Lyon', 'Rome', 'Chicago', 'Venise', 'Naple', 'Turin']]

filter_list = ['Lyon', 'Turin', 'Chicago']


def filter_list_of_list_values(myList, myFilterList):
    for index in range(len(myList)):
        sub_array = myList[index]
        for stop_word in myFilterList:
            sub_array = list(filter(lambda a: a != stop_word, sub_array))
        sub_array = [w for w in sub_array if w not in myFilterList]
        myList[index] = sub_array
    return myList  


print(filter_list_of_list_values(L, filter_list))

print('Creating pool with %d processes\n' % N_PROCS)
with multiprocessing.Pool(N_PROCS) as pool:
    TASKS = [([sub_list], filter_list) for sub_list in L]
    print("TASK OK")

    results = [pool.apply_async(filter_list_of_list_values, t) for t in TASKS]
    print("results OK")

    print("Getting final results")
    final_results = [r.get() for r in results]
    print("final_results OK")

print("Printing final_results %s" % final_results)
print("--- %s seconds ---" % (time.time() - start_time))

它基本上只是将较大的列表列表分解为较小的列表并在子过程中处理它们。我事先对你的主要功能进行了一次测试,以验证你期望的输出和验证分布式处理 returns 相同的结果。我认为这就是重点,但我不确定,因为除了 "This code doesn't work" 和 "I expect these outputs"

之外,问题还不清楚

这是脚本输出:

[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille'], ['New-York', 'NapleWashington', 'Las Vegas'], ['Rome', 'Venise', 'Naple']]
Creating pool with 6 processes

TASK OK
results OK
Getting final results
final_results OK
Printing final_results [[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille']], [['New-York', 'NapleWashington', 'Las Vegas']], [['Rome', 'Venise', 'Naple']]]