在 Pyspark 中读取 tar.gz 存档时使用特定模式过滤文件

Filtering files using specific pattern when reading tar.gz archive in Pyspark

我的文件夹 myfolder.tar.gz 中有多个 CSV 文件。我以这种方式创建的:首先将我所有的文件放在一个文件夹名称 myfolder 中,然后准备一个 tar 文件夹。然后准备 tar 文件夹的 .gz

假设我们有 5 个文件。

abc_1.csv
abc_2.csv
abc_3.csv
def_1.csv
def_2.csv

我想使用 Pyspark 数据框以特定文件名模式过滤读取的文件。就像我们想一起阅读所有 abc 个文件。

这不应该给我们 def 的结果,反之亦然。目前,我只需使用 spark.read.csv() 函数就可以一起读取所有 CSV 文件。此外,当我使用 pathGlobalFilter 参数将文件保存在一个简单的文件夹中时,我可以过滤文件,如下所示:

df = spark.read.csv("mypath",pathGlobalFilter="def_[1-9].csv")

但是当我能够在 tar.gz 中做同样的事情时,例如:

df = spark.read.csv("myfolder.tar.gz", pathGlobalFilter="def_[1-9].csv")

我收到一个错误:

Unable to infer Schema for CSV. How to read from .tar.gz file.

基于此 ,您可以将 .tar.gz 文件读取为 binaryFile 然后使用 python tarfile 您可以提取存档成员并过滤在文件名上使用正则表达式 def_[1-9]。结果是一个 rdd,您可以将其转换为数据框:

import re
import tarfile
from io import BytesIO

# extract only the files with which math regex 'def_[1-9].csv'
def extract_files(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if re.match(r"def_[1-9].csv", x.name)]

# read binary file and convert to df
rdd = sc.binaryFiles("/path/myfolder.tar.gz") \
        .mapValues(extract_files) \
        .flatMap(lambda row: [x.decode("utf-8").split("\n") for x in row[1]])\
        .flatMap(lambda row: [e.split(",") for e in row])

df = rdd.toDF(*csv_cols)