在 concurrent.futures 中共享变量

Share variable in concurrent.futures

我正在尝试使用 concurrent.futures 对 mapreduce 进行单词计数器,之前我已经完成了一个多线程版本,但是速度很慢,因为 CPU 绑定了。 我做了映射部分,将单词分为['word1',1], ['word2,1], ['word1,1], ['word3',1]和进程之间,所以每个进程都会处理文本文件的一部分。下一步(“洗牌”)是将这些单词放入字典中,使其看起来像这样:word1: [1,1], word2:[1], word3: [1],但我不能在两者之间共享字典进程因为我们使用的是多进程而不是多线程,所以我怎样才能让每个进程将“1”添加到所有进程之间共享的字典中?我坚持这个,我不能继续。 我现在:

import sys
import re
import concurrent.futures
import time


# Read text file
def input(index):
    try:
        reader = open(sys.argv[index], "r", encoding="utf8")
    except OSError:
        print("Error")
        sys.exit()

    texto = reader.read()
    reader.close()
    return texto


# Convert text to list of words
def splitting(input_text):
    input_text = input_text.lower()
    input_text = re.sub('[,.;:!¡?¿()]+', '', input_text)
    words = input_text.split()
    n_processes = 4
    # Creating processes

    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = []
        for id_process in range(n_processes):
            results.append(executor.submit(mapping, words, n_processes, id_process))

        for f in concurrent.futures.as_completed(results):
            print(f.result())


def mapping(words, n_processes, id_process):
    word_map_result = []
    for i in range(int((id_process / n_processes) * len(words)),
                   int(((id_process + 1) / n_processes) * len(words))):
        word_map_result.append([words[i], 1])
    return word_map_result


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print("Please, specify a text file...")
        sys.exit()
    start_time = time.time()

    for index in range(1, len(sys.argv)):
        print(sys.argv[index], ":", sep="")
        text = input(index)
        splitting(text)
        # for word in result_dictionary_words:
        #   print(word, ':', result_dictionary_words[word])

    print("--- %s seconds ---" % (time.time() - start_time))

我看到在进行并发编程时通常最好尽量避免使用共享状态,那么如何在不在进程之间共享字典的情况下实现 Map reduce word count 呢?

您可以使用来自多处理的管理器创建共享字典。我从你的程序中了解到,你需要分享的是你的word_map_result

你可以试试这个

from multiprocessing import Manager

...
def splitting():
    ...
    word_map_result = Manager().dict()
    with concurrent.futures.....:
        ...
        results.append(executor.submit(mapping, words, n_processes, id_process, word_map_result)
        ...

    ...

def mapping(words, n_processes, id_process, word_map_result):
    for ...
    # Do not return anything - word_map_result is up to date in your main process

基本上,您将从映射函数中删除 word_map_result 的本地副本,并将其作为参数传递给 Manager 实例。这个 word_map_result 现在在所有子进程和主程序之间共享。但是,管理器会增加数据传输开销,因此这可能对您帮助不大。

在这种情况下,您不会 return 来自工作人员的任何东西,因此您不需要 for 循环来处理主程序中的结果 - 您的 word_map_result 在所有子进程中都是相同的,并且主程序。

我可能误解了你的问题,而且我不熟悉该算法是否可以重新设计它来工作,这样你就不需要在进程之间共享任何东西。

使用多处理似乎是一种误解。首先,创建池的开销和将数据传入和传出进程的开销。如果您决定使用工作函数 mapping 可以用来存储其结果的共享托管字典,请知道托管字典使用代理,其访问速度相当慢。使用托管字典的替代方法是您目前拥有的,即 mapping returns 一个列表,主进程使用这些结果来创建字典的键和值。但是 mapping 返回一个列表的意义何在,其中每个元素始终是两个元素的列表,其中第二个元素始终是常量值 1?这不是浪费时间和space吗?

我认为你的表现不会比仅仅实施拆分更快(可能更慢):

# Convert text to list of words
def splitting(input_text):
    input_text = input_text.lower()
    input_text = re.sub('[,.;:!¡?¿()]+', '', input_text)
    words = input_text.split()
    results = {}
    for word in words:
        results[word] = [1]
    return results