Spark:加载多个文件,执行相同的操作并合并到一个数据帧中
Spark: Load multiple files, perform same operation and merge into a single dataFrame
我有很多小的、单独的 .txt 文件。对于这些文件中的每一个,我都有多行被 space 分成 2 列,start_time 和 end_time(浮点数)。
我愿意:
- 加载所有 .txt 文件
- 为每一行计算一个包含 (end_time - start_time)
的新列
- 为每一行添加一个包含文件名的新列
- 最后,我想获得一个具有此架构的数据帧:
+------------+--------------+------------+------------+
| file_name | start_time | end_time | duration |
+------------+--------------+------------+------------+
我知道我可以简单地为每个文件和每一行创建一个循环,并一次向数据帧添加一行,但我想知道是否有更快的方法来执行此操作。
我对事情完成的顺序不感兴趣,但对最终结果的速度感兴趣。
我看到 SparkContext 中提供了 textFile() 和 wholeTextFiles() 等现有函数,但我不知道如何使用它们来获取做我想做的。
非常感谢任何指导或建议!
(抱歉我的英语不好)
更新:
感谢@Shu 的帮助,这是我解决问题的最终代码
from pyspark.sql.functions import split, reverse, input_file_name
original_schema = [StructField("Start", FloatType(), True),
StructField("End", FloatType(), True)]
data_structure = StructType(original_schema)
df = self.spark_session.read.\
csv(path=PATH_FILES+'\*.txt', header=False, schema=data_structure, sep='\t').\
withColumn("Filename", reverse(split(input_file_name(), "/")).getItem(0) ).\
withColumn("duration", col("End") - col("Start"))
df.show(20, False)
使用 spark.read.csv()
读取文件,如果您的列由 space
分隔,则使用 .option("delimiter"," ")
.
- 使用
input_file_name
函数获取文件名。
示例:
from pyspark.sql.functions import *
spark.read.option("header",true).\
option("delimiter"," ").\
csv("<path>").\
withColumn("file_name",input_file_name).\
withColumn("duration",col("end_time") - col("start_time")).show()
如果行由 space
分隔,则使用文件中不存在的分隔符读取数据。
然后用\s+
拆分数据并展开现在我们将数据放入数据帧的行中。
使用 substring 函数提取 start_time,end_time
并减去它们以获得持续时间。
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",input_file_name()).\
drop("_c0").\
show()
UPDATE
Using array index:
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/'))[0]).\
drop("_c0").\
show()
#or
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/')).getItem(0)).\
drop("_c0").\
show()
From Spark-2.4+ Using element_at:
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",element_at(split(input_file_name(),'/'),-1)).\
drop("_c0").\
show()
Scala 中的另一种类似方法 -
使用 spark.read.csv() 读取文件,分隔符为 space 并将文件名命名为
(假设 spark -> spark 会话已经存在)
val inputDF = spark.read
.option("inferSchema", "true")
.option("delimiter", " ")
.csv("<path>")
.toDF("start_time","end_time")
val output = inputDF
.withColumn("duration", col("end_time") - col("start_time"))
.withColumn("input_file_name", input_file_name())
.withColumn("file_name_splits", split(col("input_file_name"), "/"))
// Getting the last element from the splits using size function
.withColumn("file_name", col("file_name_splits").apply(size(col("file_name_splits")).minus(1)))
.select("file_name", "start_time", "end_time", "duration")
// To show the sample data
output.show(false)
我有很多小的、单独的 .txt 文件。对于这些文件中的每一个,我都有多行被 space 分成 2 列,start_time 和 end_time(浮点数)。
我愿意:
- 加载所有 .txt 文件
- 为每一行计算一个包含 (end_time - start_time) 的新列
- 为每一行添加一个包含文件名的新列
- 最后,我想获得一个具有此架构的数据帧:
+------------+--------------+------------+------------+
| file_name | start_time | end_time | duration |
+------------+--------------+------------+------------+
我知道我可以简单地为每个文件和每一行创建一个循环,并一次向数据帧添加一行,但我想知道是否有更快的方法来执行此操作。
我对事情完成的顺序不感兴趣,但对最终结果的速度感兴趣。
我看到 SparkContext 中提供了 textFile() 和 wholeTextFiles() 等现有函数,但我不知道如何使用它们来获取做我想做的。
非常感谢任何指导或建议!
(抱歉我的英语不好)
更新:
感谢@Shu 的帮助,这是我解决问题的最终代码
from pyspark.sql.functions import split, reverse, input_file_name
original_schema = [StructField("Start", FloatType(), True),
StructField("End", FloatType(), True)]
data_structure = StructType(original_schema)
df = self.spark_session.read.\
csv(path=PATH_FILES+'\*.txt', header=False, schema=data_structure, sep='\t').\
withColumn("Filename", reverse(split(input_file_name(), "/")).getItem(0) ).\
withColumn("duration", col("End") - col("Start"))
df.show(20, False)
使用 spark.read.csv()
读取文件,如果您的列由 space
分隔,则使用 .option("delimiter"," ")
.
- 使用
input_file_name
函数获取文件名。
示例:
from pyspark.sql.functions import *
spark.read.option("header",true).\
option("delimiter"," ").\
csv("<path>").\
withColumn("file_name",input_file_name).\
withColumn("duration",col("end_time") - col("start_time")).show()
如果行由 space
分隔,则使用文件中不存在的分隔符读取数据。
然后用
\s+
拆分数据并展开现在我们将数据放入数据帧的行中。使用 substring 函数提取
start_time,end_time
并减去它们以获得持续时间。
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",input_file_name()).\
drop("_c0").\
show()
UPDATE
Using array index:
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/'))[0]).\
drop("_c0").\
show()
#or
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/')).getItem(0)).\
drop("_c0").\
show()
From Spark-2.4+ Using element_at:
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\s+"))).\
withColumn("filename",element_at(split(input_file_name(),'/'),-1)).\
drop("_c0").\
show()
Scala 中的另一种类似方法 - 使用 spark.read.csv() 读取文件,分隔符为 space 并将文件名命名为 (假设 spark -> spark 会话已经存在)
val inputDF = spark.read
.option("inferSchema", "true")
.option("delimiter", " ")
.csv("<path>")
.toDF("start_time","end_time")
val output = inputDF
.withColumn("duration", col("end_time") - col("start_time"))
.withColumn("input_file_name", input_file_name())
.withColumn("file_name_splits", split(col("input_file_name"), "/"))
// Getting the last element from the splits using size function
.withColumn("file_name", col("file_name_splits").apply(size(col("file_name_splits")).minus(1)))
.select("file_name", "start_time", "end_time", "duration")
// To show the sample data
output.show(false)