避免加入 Spark Scala DataFrame
Avoid Join in Spark Scala DataFrame
我必须根据位于 Azure Blob 存储文件夹中的 JSON 文件进行计算。我正在 Azure HDInsight 上处理 Apache Spark。
此文件夹有一个与跟踪顺序相关的编号。如果存在更高的数字,我必须阅读此文件夹的 JSON 并丢弃数字较小的文件夹。例如,如果我有一个名为 20200501-1 和 20200501-2 的文件夹,我必须读取 20200501-2.
我在 Apache Spark 中找到的解决方案是读取路径,并向数据框添加一列,如下所示:
val visits = session.read.schema(schema).json(pathData).withColumn("path", input_file_name())
并且通过这条路我做了一些转变。但是,这种转换涉及连接和分组,因此,当我 运行 集群中具有大型数据集的作业时,Spark 作业会花费大量时间。是否有可能进行不同的转换?或者改进我的方法。
我的转换使用这样的数据框(添加一列之后):
val visits = Seq(
("ITEM4449", 33, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM4450", 16, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 88, "https://somefolder@some.net/20200514-2/somename.json"),
("ITEM4453", 64, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 12, "https://somefolder@some.net/20200514-1/somename.json")).
toDF("itemId", "visits", "path")
我做了这个改造:
def discardByTrackingCode(rawDataFrame: DataFrame): DataFrame = {
val visitWithColumn = rawDataFrame.
withColumn("tracking_version",
expr("substring(path, 38, 1)"))
val itemVersionDf = visitWithColumn.
withColumn("item_version",
concat(col("ItemId"), lit("_"), col("tracking_version")))
val versionToTakeDf = itemVersionDf.
groupBy(col("ItemId").as("item_id_delete")).
agg(max("item_version").as("item_version"))
val itemReport = itemVersionDf.join(versionToTakeDf, Seq("item_version"))
val finalDf = itemReport.select("ItemId", "Visits", "item_version")
finalDf
}
并得到如下数据框,是正确的:
+--------+------+------------+
|ItemId |Visits|item_version|
+--------+------+------------+
|ITEM4449|33 |ITEM4449_1 |
|ITEM4450|16 |ITEM4450_1 |
|ITEM1111|88 |ITEM1111_2 |
|ITEM4453|64 |ITEM4453_1 |
+--------+------+------------+
有没有最有效的方法使这个功能起作用?除此之外。是否可以(或更可取)使用 Hadoop FileSystem class?
查找文件夹
你可以尝试使用Window表达式:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("itemidnumber").orderBy(desc("fileVersion"))
val visits = Seq(
("ITEM4449", 33, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM4450", 16, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 88, "https://somefolder@some.net/20200514-2/somename.json"),
("ITEM4453", 64, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 12, "https://somefolder@some.net/20200514-1/somename.json"))
.toDF("itemId", "visits", "path")
.withColumn("itemidnumber", expr("substring(itemId, 5, 4)"))
.withColumn("fileversion", expr("substring(path, 38, 1)"))
.withColumn("tracking_version", expr("concat(itemidnumber, substring(path, 38, 1))"))
.withColumn("row_number", row_number.over(window))
.filter($"row_number" === 1)
display(visits)
输出:
我必须根据位于 Azure Blob 存储文件夹中的 JSON 文件进行计算。我正在 Azure HDInsight 上处理 Apache Spark。
此文件夹有一个与跟踪顺序相关的编号。如果存在更高的数字,我必须阅读此文件夹的 JSON 并丢弃数字较小的文件夹。例如,如果我有一个名为 20200501-1 和 20200501-2 的文件夹,我必须读取 20200501-2.
我在 Apache Spark 中找到的解决方案是读取路径,并向数据框添加一列,如下所示:
val visits = session.read.schema(schema).json(pathData).withColumn("path", input_file_name())
并且通过这条路我做了一些转变。但是,这种转换涉及连接和分组,因此,当我 运行 集群中具有大型数据集的作业时,Spark 作业会花费大量时间。是否有可能进行不同的转换?或者改进我的方法。
我的转换使用这样的数据框(添加一列之后):
val visits = Seq(
("ITEM4449", 33, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM4450", 16, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 88, "https://somefolder@some.net/20200514-2/somename.json"),
("ITEM4453", 64, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 12, "https://somefolder@some.net/20200514-1/somename.json")).
toDF("itemId", "visits", "path")
我做了这个改造:
def discardByTrackingCode(rawDataFrame: DataFrame): DataFrame = {
val visitWithColumn = rawDataFrame.
withColumn("tracking_version",
expr("substring(path, 38, 1)"))
val itemVersionDf = visitWithColumn.
withColumn("item_version",
concat(col("ItemId"), lit("_"), col("tracking_version")))
val versionToTakeDf = itemVersionDf.
groupBy(col("ItemId").as("item_id_delete")).
agg(max("item_version").as("item_version"))
val itemReport = itemVersionDf.join(versionToTakeDf, Seq("item_version"))
val finalDf = itemReport.select("ItemId", "Visits", "item_version")
finalDf
}
并得到如下数据框,是正确的:
+--------+------+------------+
|ItemId |Visits|item_version|
+--------+------+------------+
|ITEM4449|33 |ITEM4449_1 |
|ITEM4450|16 |ITEM4450_1 |
|ITEM1111|88 |ITEM1111_2 |
|ITEM4453|64 |ITEM4453_1 |
+--------+------+------------+
有没有最有效的方法使这个功能起作用?除此之外。是否可以(或更可取)使用 Hadoop FileSystem class?
查找文件夹你可以尝试使用Window表达式:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("itemidnumber").orderBy(desc("fileVersion"))
val visits = Seq(
("ITEM4449", 33, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM4450", 16, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 88, "https://somefolder@some.net/20200514-2/somename.json"),
("ITEM4453", 64, "https://somefolder@some.net/20200514-1/somename.json"),
("ITEM1111", 12, "https://somefolder@some.net/20200514-1/somename.json"))
.toDF("itemId", "visits", "path")
.withColumn("itemidnumber", expr("substring(itemId, 5, 4)"))
.withColumn("fileversion", expr("substring(path, 38, 1)"))
.withColumn("tracking_version", expr("concat(itemidnumber, substring(path, 38, 1))"))
.withColumn("row_number", row_number.over(window))
.filter($"row_number" === 1)
display(visits)
输出: