如何使用 PySpark 正确导入 CSV 文件

How to properly import CSV files with PySpark

我知道,可以使用以下命令使用 PySpark 为 RDD 加载文件:

sc = spark.sparkContext
someRDD = sc.textFile("some.csv")

或数据帧:

spark.read.options(delimiter=',') \
  .csv("some.csv")

我的 file 是一个包含 10 列的 .csv,由 ',' 分隔。但是,最后一列包含一些文本,其中也有很多 ","。按 "," 拆分将导致每行的列大小不同,而且,我没有将整个文本放在一列中。 我只是在寻找一种将 .csv 文件加载到在最后一个索引处具有多个 "," 的数据帧中的好方法。

也许有办法只拆分前 n 列?因为可以保证,文本列之前的所有列仅由一个 "," 分隔。有趣的是,使用 pd.read_csv 不会导致这个问题!到目前为止,我的解决方法是使用

加载文件
csv = pd.read_csv("some.csv", delimiter=",")
csv_to_array = csv.values.tolist()
df = createDataFrame(csv_to_array)

这不是一个很好的解决方案。此外,它不允许我在我的数据框上使用某些模式。

如果您无法更正输入文件,那么您可以尝试将其加载为文本,然后拆分值以获得所需的列。这是一个例子:

输入文件
1,2,3,4,5,6,7,8,9,10,0,12,121
1,2,3,4,5,6,7,8,9,10,0,12,121
读取和解析
from pyspark.sql import functions as F

nb_cols = 5

df = spark.read.text("file.csv")

df = df.withColumn(
    "values",
    F.split("value", ",")
).select(
    *[F.col("values")[i].alias(f"col_{i}") for i in range(nb_cols)],
    F.array_join(F.expr(f"slice(values, {nb_cols + 1}, size(values))"), ",").alias(f"col_{nb_cols}")
)

df.show()
#+-----+-----+-----+-----+-----+-------------------+
#|col_0|col_1|col_2|col_3|col_4|              col_5|
#+-----+-----+-----+-----+-----+-------------------+
#|    1|    2|    3|    4|    5|6,7,8,9,10,0,12,121|
#|    1|    2|    3|    4|    5|6,7,8,9,10,0,12,121|
#+-----+-----+-----+-----+-----+-------------------+