Databricks pyspark并行解压缩多个文件
Databricks pyspark parallelize unzipping multiple files
我正在尝试在 Databricks 的 pyspark
中并行解压缩存储在 s3
中的文件。在 for
循环中解压是这样的:
file_list = [(file.path, file.name) for file in dbutils.fs.ls(data_path) if os.path.basename(file.path).endswith(".zip")] # data_path is taken as a parameter
file_names = [ff[1] for ff in file_list]
for ff in file_list:
dbutils.fs.cp(ff[0], "/FileStore/tmp/" + ff[1])
cmd = 'unzip /dbfs/FileStore/tmp/' + ff[1]
os.system(cmd)
dbutils.fs.cp("file:/databricks/driver/" + ff[1], data_path)
dbutils.fs.rm("file:/databricks/driver/" + ff[1])
我正在尝试并行化解压缩部分。所以在将文件复制到 "/FileStore/tmp/"
后,我是 运行:
unzips = [file[1] for file in file_list]
def f(x):
os.system('unzip /dbfs/FileStore/tmp/' + x)
sc.parallelize(unzips).foreach(f)
作业运行但文件未解压到任何地方。
只需添加 -d
选项即可将输出放入 DBFS,如下所示:
def f(x):
os.system('unzip -o -d /dbfs/FileStore/tmp-output/ /dbfs/FileStore/tmp/' + x)
如有必要(例如,存档中没有目录),为每个 运行 添加更多唯一目录,例如,向其附加 x
。
我正在尝试在 Databricks 的 pyspark
中并行解压缩存储在 s3
中的文件。在 for
循环中解压是这样的:
file_list = [(file.path, file.name) for file in dbutils.fs.ls(data_path) if os.path.basename(file.path).endswith(".zip")] # data_path is taken as a parameter
file_names = [ff[1] for ff in file_list]
for ff in file_list:
dbutils.fs.cp(ff[0], "/FileStore/tmp/" + ff[1])
cmd = 'unzip /dbfs/FileStore/tmp/' + ff[1]
os.system(cmd)
dbutils.fs.cp("file:/databricks/driver/" + ff[1], data_path)
dbutils.fs.rm("file:/databricks/driver/" + ff[1])
我正在尝试并行化解压缩部分。所以在将文件复制到 "/FileStore/tmp/"
后,我是 运行:
unzips = [file[1] for file in file_list]
def f(x):
os.system('unzip /dbfs/FileStore/tmp/' + x)
sc.parallelize(unzips).foreach(f)
作业运行但文件未解压到任何地方。
只需添加 -d
选项即可将输出放入 DBFS,如下所示:
def f(x):
os.system('unzip -o -d /dbfs/FileStore/tmp-output/ /dbfs/FileStore/tmp/' + x)
如有必要(例如,存档中没有目录),为每个 运行 添加更多唯一目录,例如,向其附加 x
。