使用 pyspark 在循环中附加 Spark DataFrame 的有效方法
Efficient way of appending Spark DataFrames in a loop using pyspark
我有'|'分隔的巨大文本文件,我想合并所有文本文件并创建一个巨大的 spark 数据框,稍后将使用 pyspark 将其用于 ETL 过程。
低效的方式
1) 创建一个空的 spark 数据帧,df
2) 在循环中,读取关于 spark dataframe df1 的文本文件并将其附加到空 spark dataframe df
df = spark.createDataFrame([],schema)
for x in os.listdir(textfiles_dir):
filepath = '{}/{}'.format(textfiles_dir,x)
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(filepath)
df = df.union(df1)
这不是一种有效的火花方式。
任何人都可以提出一个有效的方法吗?
如果用示例代码来解释那就太好了。
谢谢:)
- df1 = spark.read。 ... .load("pathFolder/") - 读取文件夹
中的所有文件
- df1 另存为 table 数据库或文件
正如其他人所指出的,您需要将整个文本文件目录作为数据框读取,而不是迭代读取每个单独的目录:
df = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(textfiles_dir)
如果你真的想走联合路线,我建议在 SparkContext (http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=union#pyspark.SparkContext.union) 中使用 union
函数而不是 DataFrame 中的联合函数:
dfs = []
for x in os.listdir(textfiles_dir):
filepath = '{}/{}'.format(textfiles_dir,x)
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(filepath)
dfs.append(df1)
df = spark.sparkContext.union(dfs)
filepath = 存在多个文件的目录的文件路径
dataframe = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(文件路径)
我有'|'分隔的巨大文本文件,我想合并所有文本文件并创建一个巨大的 spark 数据框,稍后将使用 pyspark 将其用于 ETL 过程。
低效的方式
1) 创建一个空的 spark 数据帧,df
2) 在循环中,读取关于 spark dataframe df1 的文本文件并将其附加到空 spark dataframe df
df = spark.createDataFrame([],schema)
for x in os.listdir(textfiles_dir):
filepath = '{}/{}'.format(textfiles_dir,x)
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(filepath)
df = df.union(df1)
这不是一种有效的火花方式。
任何人都可以提出一个有效的方法吗? 如果用示例代码来解释那就太好了。
谢谢:)
- df1 = spark.read。 ... .load("pathFolder/") - 读取文件夹 中的所有文件
- df1 另存为 table 数据库或文件
正如其他人所指出的,您需要将整个文本文件目录作为数据框读取,而不是迭代读取每个单独的目录:
df = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(textfiles_dir)
如果你真的想走联合路线,我建议在 SparkContext (http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=union#pyspark.SparkContext.union) 中使用 union
函数而不是 DataFrame 中的联合函数:
dfs = []
for x in os.listdir(textfiles_dir):
filepath = '{}/{}'.format(textfiles_dir,x)
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(filepath)
dfs.append(df1)
df = spark.sparkContext.union(dfs)
filepath = 存在多个文件的目录的文件路径
dataframe = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(文件路径)