在 Pyspark 中读取 tar.gz 存档时使用特定模式过滤文件
Filtering files using specific pattern when reading tar.gz archive in Pyspark
我的文件夹 myfolder.tar.gz
中有多个 CSV 文件。我以这种方式创建的:首先将我所有的文件放在一个文件夹名称 myfolder
中,然后准备一个 tar
文件夹。然后准备 tar 文件夹的 .gz
。
假设我们有 5 个文件。
abc_1.csv
abc_2.csv
abc_3.csv
def_1.csv
def_2.csv
我想使用 Pyspark 数据框以特定文件名模式过滤读取的文件。就像我们想一起阅读所有 abc
个文件。
这不应该给我们 def
的结果,反之亦然。目前,我只需使用 spark.read.csv()
函数就可以一起读取所有 CSV 文件。此外,当我使用 pathGlobalFilter
参数将文件保存在一个简单的文件夹中时,我可以过滤文件,如下所示:
df = spark.read.csv("mypath",pathGlobalFilter="def_[1-9].csv")
但是当我能够在 tar.gz
中做同样的事情时,例如:
df = spark.read.csv("myfolder.tar.gz", pathGlobalFilter="def_[1-9].csv")
我收到一个错误:
Unable to infer Schema for CSV. How to read from .tar.gz file.
基于此 ,您可以将 .tar.gz
文件读取为 binaryFile
然后使用 python tarfile
您可以提取存档成员并过滤在文件名上使用正则表达式 def_[1-9]
。结果是一个 rdd,您可以将其转换为数据框:
import re
import tarfile
from io import BytesIO
# extract only the files with which math regex 'def_[1-9].csv'
def extract_files(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if re.match(r"def_[1-9].csv", x.name)]
# read binary file and convert to df
rdd = sc.binaryFiles("/path/myfolder.tar.gz") \
.mapValues(extract_files) \
.flatMap(lambda row: [x.decode("utf-8").split("\n") for x in row[1]])\
.flatMap(lambda row: [e.split(",") for e in row])
df = rdd.toDF(*csv_cols)
我的文件夹 myfolder.tar.gz
中有多个 CSV 文件。我以这种方式创建的:首先将我所有的文件放在一个文件夹名称 myfolder
中,然后准备一个 tar
文件夹。然后准备 tar 文件夹的 .gz
。
假设我们有 5 个文件。
abc_1.csv
abc_2.csv
abc_3.csv
def_1.csv
def_2.csv
我想使用 Pyspark 数据框以特定文件名模式过滤读取的文件。就像我们想一起阅读所有 abc
个文件。
这不应该给我们 def
的结果,反之亦然。目前,我只需使用 spark.read.csv()
函数就可以一起读取所有 CSV 文件。此外,当我使用 pathGlobalFilter
参数将文件保存在一个简单的文件夹中时,我可以过滤文件,如下所示:
df = spark.read.csv("mypath",pathGlobalFilter="def_[1-9].csv")
但是当我能够在 tar.gz
中做同样的事情时,例如:
df = spark.read.csv("myfolder.tar.gz", pathGlobalFilter="def_[1-9].csv")
我收到一个错误:
Unable to infer Schema for CSV. How to read from .tar.gz file.
基于此 .tar.gz
文件读取为 binaryFile
然后使用 python tarfile
您可以提取存档成员并过滤在文件名上使用正则表达式 def_[1-9]
。结果是一个 rdd,您可以将其转换为数据框:
import re
import tarfile
from io import BytesIO
# extract only the files with which math regex 'def_[1-9].csv'
def extract_files(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if re.match(r"def_[1-9].csv", x.name)]
# read binary file and convert to df
rdd = sc.binaryFiles("/path/myfolder.tar.gz") \
.mapValues(extract_files) \
.flatMap(lambda row: [x.decode("utf-8").split("\n") for x in row[1]])\
.flatMap(lambda row: [e.split(",") for e in row])
df = rdd.toDF(*csv_cols)