DataFrame Spark 的优化查询
Optimization query for DataFrame Spark
我尝试从 Hive 创建 DataFrame table。但我不擅长使用 Spark API。
我需要帮助来优化方法 getLastSession
中的查询,将两个任务合并为一个 spark 任务:
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session| time_write| key| value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
PS。我的来源 Table(例如):
name_process
id_session
time_write
key
value
OtherClass
jsdfsadfsf
43434883477
schema0.table0.csv
Success
OtherClass
jksdfkjhka
23212123323
schema1.table1.csv
Success
OtherClass
alskdfksjd
23343212234
schema2.table2.csv
Failure
ExternalClass
sdfjkhsdfd
34455453434
schema3.table3.csv
Success
您可以像这样将 row_number
与 Window 一起使用:
import org.apache.spark.sql.expressions.Window
val dfByLastSession = df.withColumn(
"rn",
row_number().over(Window.orderBy(desc("time_write")))
).filter("rn=1").drop("rn")
dfByLastSession.show()
但是,由于您不按任何字段进行分区,这可能会降低性能。
您可以在代码中更改的另一件事是使用结构排序通过一个查询获取与最近的 time_write
关联的 id_session
:
val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0)
val dfByLastSession = df.filter(col("id_session") === lastSession)
我尝试从 Hive 创建 DataFrame table。但我不擅长使用 Spark API。
我需要帮助来优化方法 getLastSession
中的查询,将两个任务合并为一个 spark 任务:
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session| time_write| key| value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
PS。我的来源 Table(例如):
name_process | id_session | time_write | key | value |
---|---|---|---|---|
OtherClass | jsdfsadfsf | 43434883477 | schema0.table0.csv | Success |
OtherClass | jksdfkjhka | 23212123323 | schema1.table1.csv | Success |
OtherClass | alskdfksjd | 23343212234 | schema2.table2.csv | Failure |
ExternalClass | sdfjkhsdfd | 34455453434 | schema3.table3.csv | Success |
您可以像这样将 row_number
与 Window 一起使用:
import org.apache.spark.sql.expressions.Window
val dfByLastSession = df.withColumn(
"rn",
row_number().over(Window.orderBy(desc("time_write")))
).filter("rn=1").drop("rn")
dfByLastSession.show()
但是,由于您不按任何字段进行分区,这可能会降低性能。
您可以在代码中更改的另一件事是使用结构排序通过一个查询获取与最近的 time_write
关联的 id_session
:
val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0)
val dfByLastSession = df.filter(col("id_session") === lastSession)