为所有进程存储只读字符串数组的多处理

Multiprocessing storing read-only string-array for all processes

我正在尝试为涉及读取大量文件并分析它们的任务创建工作人员。

我想要这样的东西:

list_of_unique_keys_from_csv_file = [] # About 200mb array (10m rows)
# a list of uniquekeys for comparing inside worker processes to a set of flatfiles

我需要更多线程,因为速度很慢,用一个进程进行比较(每个文件 10 分钟)。

我有另一组平面文件,我将 CSV 文件与之进行比较,以查看是否存在唯一键。这似乎是 map reduce 类型的问题。

main.py:

def worker_process(directory_glob_of_flat_files, list_of_unique_keys_from_csv_file):
  # Do some parallel comparisons "if not in " type stuff. 
  # generate an array of
  # lines of text like : "this item_x was not detected in CSV list (from current_flatfile)"
  if current_item not in list_of_unique_keys_from_csv_file:
     all_lines_this_worker_generated.append(sometext + current_item)
  return all_lines_this_worker_generated




def main():   
    all_results = []
    pool = Pool(processes=6)
    partitioned_flat_files = [] # divide files from glob by 6
    results = pool.starmap(worker_process, partitioned_flat_files, {{{{i wanna pass in my read-only parameter}}}})
    pool.close()
    pool.join()

    all_results.extend(results )
    resulting_file.write(all_results)

我同时使用 linux 和 windows 环境,所以也许我需要一些跨平台兼容的东西(整个 fork() 讨论)。

主要问题:我是否需要某种管道或队列,我似乎找不到很好的例子来说明如何在一个大的只读字符串数组周围传输,每个工人一个副本进程?

你只需要拆分你的只读参数,然后传入即可。multiprocessing模块是跨平台兼容的,所以不用担心。

其实每个进程,甚至是子进程,都有自己的资源,也就是说不管你怎么传参数给它,它都会保留一份原始的,而不是共享的。在这个简单的例子中,当您将参数从主流程传递到子流程时,Pool 会自动复制您的变量。因为子进程只有原始进程的副本,所以不能共享修改。在这种情况下没关系,因为您的变量是只读的。

但是要小心你的代码,你需要把你需要的参数包装到一个 可迭代集合,例如:

def add(a, b):
    return a + b

pool = Pool()
results = pool.starmap(add, [(1, 2), (3, 4)])
print(results)
# [3, 7]