没有last函数的for循环并行计算
Parallel computing for loop with no last function
我正在尝试使用脚本并行读取 16 个 gzip 文件的内容:
import gzip
import glob
from dask import delayed
from dask.distributed import Client, LocalCluster
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
reads = [read.decode("utf-8") for read in reads]
return reads
if __name__ == "__main__":
cluster = LocalCluster()
client = Client(cluster)
read_files = glob.glob("*.txt.gz")
all_files = []
for file in read_files:
reads = get_gzip_delayed(file)
all_files.extend(reads)
with open("all_reads.txt", "w") as f:
w = delayed(all_files.writelines)(f)
w.compute()
但是,我收到以下错误:
> TypeError: Delayed objects of unspecified length are not iterable
如何使用 extend/append 并行化 for 循环并将函数写入文档。所有 dask 示例总是包含一些对 for 循环产品执行的最终功能。
List all_files
由 delayed
值组成,并且调用 delayed(f.writelines)(all_files)
(注意与问题中的代码相关的不同参数)由于多种原因无法正常工作,主要是你准备了写入的惰性指令,但是只有在关闭文件后才执行。
解决这个问题有多种方法,至少有两种:
- 如果文件中的数据适合内存,那么计算它并写入文件是最简单的:
all_files = dask.compute(all_files)
with open("all_reads.txt", "w") as f:
f.writelines(all_files)
- 如果数据无法放入内存,那么另一种选择是将写入放在
get_gzip_delayed
函数中,这样数据就不会在 worker 和 client 之间传输:
from dask.distributed import Lock
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
# create a lock to prevent others from writing at the same time
with Lock("all_reads.txt"):
with open("all_reads.txt", "a") as f: # need to be careful here, since files are appending
f.writelines([read.decode("utf-8") for read in reads])
请注意,如果内存是一个严格的限制,那么也可以重构上述内容以处理文件line-by-line(以较慢的 IO 为代价)。
我正在尝试使用脚本并行读取 16 个 gzip 文件的内容:
import gzip
import glob
from dask import delayed
from dask.distributed import Client, LocalCluster
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
reads = [read.decode("utf-8") for read in reads]
return reads
if __name__ == "__main__":
cluster = LocalCluster()
client = Client(cluster)
read_files = glob.glob("*.txt.gz")
all_files = []
for file in read_files:
reads = get_gzip_delayed(file)
all_files.extend(reads)
with open("all_reads.txt", "w") as f:
w = delayed(all_files.writelines)(f)
w.compute()
但是,我收到以下错误:
> TypeError: Delayed objects of unspecified length are not iterable
如何使用 extend/append 并行化 for 循环并将函数写入文档。所有 dask 示例总是包含一些对 for 循环产品执行的最终功能。
List all_files
由 delayed
值组成,并且调用 delayed(f.writelines)(all_files)
(注意与问题中的代码相关的不同参数)由于多种原因无法正常工作,主要是你准备了写入的惰性指令,但是只有在关闭文件后才执行。
解决这个问题有多种方法,至少有两种:
- 如果文件中的数据适合内存,那么计算它并写入文件是最简单的:
all_files = dask.compute(all_files)
with open("all_reads.txt", "w") as f:
f.writelines(all_files)
- 如果数据无法放入内存,那么另一种选择是将写入放在
get_gzip_delayed
函数中,这样数据就不会在 worker 和 client 之间传输:
from dask.distributed import Lock
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
# create a lock to prevent others from writing at the same time
with Lock("all_reads.txt"):
with open("all_reads.txt", "a") as f: # need to be careful here, since files are appending
f.writelines([read.decode("utf-8") for read in reads])
请注意,如果内存是一个严格的限制,那么也可以重构上述内容以处理文件line-by-line(以较慢的 IO 为代价)。