使用 pyspark 在循环中附加 Spark DataFrame 的有效方法

Efficient way of appending Spark DataFrames in a loop using pyspark

我有'|'分隔的巨大文本文件,我想合并所有文本文件并创建一个巨大的 spark 数据框,稍后将使用 pyspark 将其用于 ETL 过程。

低效的方式

1) 创建一个空的 spark 数据帧,df

2) 在循环中,读取关于 spark dataframe df1 的文本文件并将其附加到空 spark dataframe df

df = spark.createDataFrame([],schema)
for x in os.listdir(textfiles_dir):
    filepath = '{}/{}'.format(textfiles_dir,x)
    df1 = spark.read.format("csv") \
                    .option("header", "true") \
                    .option("delimiter", "|") \
                    .option("inferSchema","true") \
                    .load(filepath)
    df = df.union(df1)

这不是一种有效的火花方式。

任何人都可以提出一个有效的方法吗? 如果用示例代码来解释那就太好了。

谢谢:)

  1. df1 = spark.read。 ... .load("pathFolder/") - 读取文件夹
  2. 中的所有文件
  3. df1 另存为 table 数据库或文件

正如其他人所指出的,您需要将整个文本文件目录作为数据框读取,而不是迭代读取每个单独的目录:

df = spark.read.format("csv") \
                    .option("header", "true") \
                    .option("delimiter", "|") \
                    .option("inferSchema","true") \
                    .load(textfiles_dir)

如果你真的想走联合路线,我建议在 SparkContext (http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=union#pyspark.SparkContext.union) 中使用 union 函数而不是 DataFrame 中的联合函数:

dfs = []
for x in os.listdir(textfiles_dir):
   filepath = '{}/{}'.format(textfiles_dir,x)
   df1 = spark.read.format("csv") \
                .option("header", "true") \
                .option("delimiter", "|") \
                .option("inferSchema","true") \
                .load(filepath)
   dfs.append(df1)
df = spark.sparkContext.union(dfs)

filepath = 存在多个文件的目录的文件路径

dataframe = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(文件路径)