没有last函数的for循环并行计算

Parallel computing for loop with no last function

我正在尝试使用脚本并行读取 16 个 gzip 文件的内容:

import gzip
import glob

from dask import delayed
from dask.distributed import Client, LocalCluster

@delayed
def get_gzip_delayed(gzip_file):
    with gzip.open(gzip_file) as f:
        reads = f.readlines()
    reads = [read.decode("utf-8") for read in reads]
    return reads

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    read_files = glob.glob("*.txt.gz")
    all_files = []
    for file in read_files:
        reads = get_gzip_delayed(file)
        all_files.extend(reads)

    with open("all_reads.txt", "w") as f:
        w = delayed(all_files.writelines)(f)
    
    w.compute()

但是,我收到以下错误:

> TypeError: Delayed objects of unspecified length are not iterable

如何使用 extend/append 并行化 for 循环并将函数写入文档。所有 dask 示例总是包含一些对 for 循环产品执行的最终功能。

List all_filesdelayed 值组成,并且调用 delayed(f.writelines)(all_files) (注意与问题中的代码相关的不同参数)由于多种原因无法正常工作,主要是你准备了写入的惰性指令,但是只有在关闭文件后才执行。

解决这个问题有多种方法,至少有两种:

  • 如果文件中的数据适合内存,那么计算它并写入文件是最简单的:
all_files = dask.compute(all_files)
with open("all_reads.txt", "w") as f:
    f.writelines(all_files)
  • 如果数据无法放入内存,那么另一种选择是将写入放在 get_gzip_delayed 函数中,这样数据就不会在 worker 和 client 之间传输:
from dask.distributed import Lock

@delayed
def get_gzip_delayed(gzip_file):
    with gzip.open(gzip_file) as f:
        reads = f.readlines()

    # create a lock to prevent others from writing at the same time
    with Lock("all_reads.txt"):
        with open("all_reads.txt", "a") as f: # need to be careful here, since files are appending
            f.writelines([read.decode("utf-8") for read in reads])

请注意,如果内存是一个严格的限制,那么也可以重构上述内容以处理文件line-by-line(以较慢的 IO 为代价)。