如何使用 PySpark 正确导入 CSV 文件
How to properly import CSV files with PySpark
我知道,可以使用以下命令使用 PySpark 为 RDD 加载文件:
sc = spark.sparkContext
someRDD = sc.textFile("some.csv")
或数据帧:
spark.read.options(delimiter=',') \
.csv("some.csv")
我的 file
是一个包含 10 列的 .csv,由 ','
分隔。但是,最后一列包含一些文本,其中也有很多 ","
。按 ","
拆分将导致每行的列大小不同,而且,我没有将整个文本放在一列中。
我只是在寻找一种将 .csv
文件加载到在最后一个索引处具有多个 ","
的数据帧中的好方法。
也许有办法只拆分前 n
列?因为可以保证,文本列之前的所有列仅由一个 ","
分隔。有趣的是,使用 pd.read_csv
不会导致这个问题!到目前为止,我的解决方法是使用
加载文件
csv = pd.read_csv("some.csv", delimiter=",")
csv_to_array = csv.values.tolist()
df = createDataFrame(csv_to_array)
这不是一个很好的解决方案。此外,它不允许我在我的数据框上使用某些模式。
如果您无法更正输入文件,那么您可以尝试将其加载为文本,然后拆分值以获得所需的列。这是一个例子:
输入文件
1,2,3,4,5,6,7,8,9,10,0,12,121
1,2,3,4,5,6,7,8,9,10,0,12,121
读取和解析
from pyspark.sql import functions as F
nb_cols = 5
df = spark.read.text("file.csv")
df = df.withColumn(
"values",
F.split("value", ",")
).select(
*[F.col("values")[i].alias(f"col_{i}") for i in range(nb_cols)],
F.array_join(F.expr(f"slice(values, {nb_cols + 1}, size(values))"), ",").alias(f"col_{nb_cols}")
)
df.show()
#+-----+-----+-----+-----+-----+-------------------+
#|col_0|col_1|col_2|col_3|col_4| col_5|
#+-----+-----+-----+-----+-----+-------------------+
#| 1| 2| 3| 4| 5|6,7,8,9,10,0,12,121|
#| 1| 2| 3| 4| 5|6,7,8,9,10,0,12,121|
#+-----+-----+-----+-----+-----+-------------------+
我知道,可以使用以下命令使用 PySpark 为 RDD 加载文件:
sc = spark.sparkContext
someRDD = sc.textFile("some.csv")
或数据帧:
spark.read.options(delimiter=',') \
.csv("some.csv")
我的 file
是一个包含 10 列的 .csv,由 ','
分隔。但是,最后一列包含一些文本,其中也有很多 ","
。按 ","
拆分将导致每行的列大小不同,而且,我没有将整个文本放在一列中。
我只是在寻找一种将 .csv
文件加载到在最后一个索引处具有多个 ","
的数据帧中的好方法。
也许有办法只拆分前 n
列?因为可以保证,文本列之前的所有列仅由一个 ","
分隔。有趣的是,使用 pd.read_csv
不会导致这个问题!到目前为止,我的解决方法是使用
csv = pd.read_csv("some.csv", delimiter=",")
csv_to_array = csv.values.tolist()
df = createDataFrame(csv_to_array)
这不是一个很好的解决方案。此外,它不允许我在我的数据框上使用某些模式。
如果您无法更正输入文件,那么您可以尝试将其加载为文本,然后拆分值以获得所需的列。这是一个例子:
输入文件
1,2,3,4,5,6,7,8,9,10,0,12,121
1,2,3,4,5,6,7,8,9,10,0,12,121
读取和解析
from pyspark.sql import functions as F
nb_cols = 5
df = spark.read.text("file.csv")
df = df.withColumn(
"values",
F.split("value", ",")
).select(
*[F.col("values")[i].alias(f"col_{i}") for i in range(nb_cols)],
F.array_join(F.expr(f"slice(values, {nb_cols + 1}, size(values))"), ",").alias(f"col_{nb_cols}")
)
df.show()
#+-----+-----+-----+-----+-----+-------------------+
#|col_0|col_1|col_2|col_3|col_4| col_5|
#+-----+-----+-----+-----+-----+-------------------+
#| 1| 2| 3| 4| 5|6,7,8,9,10,0,12,121|
#| 1| 2| 3| 4| 5|6,7,8,9,10,0,12,121|
#+-----+-----+-----+-----+-----+-------------------+