DataFrame Spark 的转换
Transformation of a DataFrame Spark
我想使用具有特定类型字段的镶木地板 table:
name_process:字符串id_session:字符串time_write:字符串键:字符串值:字符串
"id_session" 是 SparkSession 的 ID。
table 被“name_process”列分区
例如:
name_process
id_session
time_write
key
value
OtherClass
sess000001
1639950466114000
schema0.table0.csv
Success
OtherClass
sess000002
1639950466214000
schema1.table1.csv
Success
OtherClass
sess000003
1639950466309000
schema0.table0.csv
Success
OtherClass
sess000003
1639950466310000
schema1.table1.csv
Failure
OtherClass
sess000003
1639950466311000
schema2.table2.csv
Success
OtherClass
sess000003
1639950466312000
schema3.table3.csv
Success
ExternalClass
sess000004
1639950466413000
schema0.table0.csv
Success
“key”列的所有值仅在一个 spark 会话(“id_session”列)中是唯一的。发生这种情况是因为我每次启动 spark 会话时都使用相同的文件 (csv)。我打算将这些文件发送到服务器。发送时间和服务器的响应都将记录在“time_write”和“值”列中。
也就是说,我想查看所有csv文件的最新发送状态。
这是我将与之交互的条目的日志。为了与这个日志交互,我想实现几个方法:
所有 getter 方法都将 return 过滤包含所有列的 DateFrames。也就是说,结果仍然是 5 列。
我在使用 API Spark 时仍然遇到困难。我需要一些时间才能学会如何在 DataFrames 上执行漂亮的操作。
这是我的结果:
abstract class ProcessResultBook(processName: String, onlyPartition: Boolean = true)(implicit spark: SparkSession) {
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session| time_write| key| value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
def add(df: DataFrame) = ???
def add(processName: String, idSession: String, timeWrite: String, key: String, value: String) = ???
def getSessionsByProcess(processName: String) = ???
def getBySessionAndProcess(processName: String, idSession: String) = ???
def getUnique(processName: String) = ???
def delByTime(time: String) = ???
def delByIdSession(idSession: String) = ???
def getCurrentTime: SQLTimestamp = DateTimeUtils.fromMillis(TimeStamp.getCurrentTime.getTime)
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我有案例 class:
case class RowProcessResult(
nameProcess: String,
idSession: String,
timeWrite: String,
key: String,
value: String
)
帮助实现2个方法:
- def add(data: List[RowProcessResult]): Unit
- def getUnique(nameProcess: String): DataFrame 或 List[RowProcessResult]
方法add(..) 已在hive table.
中添加数据收集
方法getUnique(nameProcess: String): DataFrame。 Returns 一个 DataFrame,其中包含“键”列的唯一值的所有列。对于每个唯一的“关键”值,选择最近的日期。
PS.: 我的测试 class 创建 Hive Table:
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
好久不见了。我留下我的决定。
import spark.implicits._
val schema = "test_db"
val table = "test_table"
val df = spark.read.table(s"$schema.$table").filter(col("name_process") === processName).persist
def getLastSession: Dataset[Row] = {
val lastSessionId = df.select(max(struct(col("time_write"), col("id_session")))("id_session"))
.first.getString(0)
val dfByLastSession = df.filter(col("id_session") === lastSessionId)
dfByLastSession.show()
dfByLastSession
}
def add(listRows: Seq[RowProcessResult]) = {
val df = listRows.toDF().withColumn("name_process", lit(processName))
df.show()
addDfToTable(df)
}
def add(nameProcess: String, idSession: String, timeWrite: String, key: String, value: String) = {
val df = RowProcessResult(idSession, timeWrite, key, value) :: Nil toDF()
addDfToTable(df)
}
def getSessionsByProcess(externalProcessName: String) = {
spark.read.table(s"$schema.$table").filter(col("name_process") === externalProcessName)
}
def getSession(idSession: String, processName: String = this.processName) = {
if (processName.equals(this.processName))
df.filter(col("id_session") === idSession)
else
getSessionsByProcess(processName).filter(col("id_session") === idSession)
}
def getUnique = df.sort(col("time_write").desc).dropDuplicates("key")
def addDfToTable(df: DataFrame) =
df.write.mode(SaveMode.Append).insertInto(s"$schema.$table")
def getFullDf = df
def getCurrentTime = TimeStamp.getCurrentTime
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我可以得到可以容忍的解决方案。这还不错。谢谢你,新年快乐! =)
我想使用具有特定类型字段的镶木地板 table:
name_process:字符串id_session:字符串time_write:字符串键:字符串值:字符串
"id_session" 是 SparkSession 的 ID。
table 被“name_process”列分区
例如:
name_process | id_session | time_write | key | value |
---|---|---|---|---|
OtherClass | sess000001 | 1639950466114000 | schema0.table0.csv | Success |
OtherClass | sess000002 | 1639950466214000 | schema1.table1.csv | Success |
OtherClass | sess000003 | 1639950466309000 | schema0.table0.csv | Success |
OtherClass | sess000003 | 1639950466310000 | schema1.table1.csv | Failure |
OtherClass | sess000003 | 1639950466311000 | schema2.table2.csv | Success |
OtherClass | sess000003 | 1639950466312000 | schema3.table3.csv | Success |
ExternalClass | sess000004 | 1639950466413000 | schema0.table0.csv | Success |
“key”列的所有值仅在一个 spark 会话(“id_session”列)中是唯一的。发生这种情况是因为我每次启动 spark 会话时都使用相同的文件 (csv)。我打算将这些文件发送到服务器。发送时间和服务器的响应都将记录在“time_write”和“值”列中。 也就是说,我想查看所有csv文件的最新发送状态。
这是我将与之交互的条目的日志。为了与这个日志交互,我想实现几个方法:
所有 getter 方法都将 return 过滤包含所有列的 DateFrames。也就是说,结果仍然是 5 列。 我在使用 API Spark 时仍然遇到困难。我需要一些时间才能学会如何在 DataFrames 上执行漂亮的操作。 这是我的结果:
abstract class ProcessResultBook(processName: String, onlyPartition: Boolean = true)(implicit spark: SparkSession) {
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session| time_write| key| value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
def add(df: DataFrame) = ???
def add(processName: String, idSession: String, timeWrite: String, key: String, value: String) = ???
def getSessionsByProcess(processName: String) = ???
def getBySessionAndProcess(processName: String, idSession: String) = ???
def getUnique(processName: String) = ???
def delByTime(time: String) = ???
def delByIdSession(idSession: String) = ???
def getCurrentTime: SQLTimestamp = DateTimeUtils.fromMillis(TimeStamp.getCurrentTime.getTime)
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我有案例 class:
case class RowProcessResult(
nameProcess: String,
idSession: String,
timeWrite: String,
key: String,
value: String
)
帮助实现2个方法:
- def add(data: List[RowProcessResult]): Unit
- def getUnique(nameProcess: String): DataFrame 或 List[RowProcessResult]
方法add(..) 已在hive table.
中添加数据收集方法getUnique(nameProcess: String): DataFrame。 Returns 一个 DataFrame,其中包含“键”列的唯一值的所有列。对于每个唯一的“关键”值,选择最近的日期。
PS.: 我的测试 class 创建 Hive Table:
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
好久不见了。我留下我的决定。
import spark.implicits._
val schema = "test_db"
val table = "test_table"
val df = spark.read.table(s"$schema.$table").filter(col("name_process") === processName).persist
def getLastSession: Dataset[Row] = {
val lastSessionId = df.select(max(struct(col("time_write"), col("id_session")))("id_session"))
.first.getString(0)
val dfByLastSession = df.filter(col("id_session") === lastSessionId)
dfByLastSession.show()
dfByLastSession
}
def add(listRows: Seq[RowProcessResult]) = {
val df = listRows.toDF().withColumn("name_process", lit(processName))
df.show()
addDfToTable(df)
}
def add(nameProcess: String, idSession: String, timeWrite: String, key: String, value: String) = {
val df = RowProcessResult(idSession, timeWrite, key, value) :: Nil toDF()
addDfToTable(df)
}
def getSessionsByProcess(externalProcessName: String) = {
spark.read.table(s"$schema.$table").filter(col("name_process") === externalProcessName)
}
def getSession(idSession: String, processName: String = this.processName) = {
if (processName.equals(this.processName))
df.filter(col("id_session") === idSession)
else
getSessionsByProcess(processName).filter(col("id_session") === idSession)
}
def getUnique = df.sort(col("time_write").desc).dropDuplicates("key")
def addDfToTable(df: DataFrame) =
df.write.mode(SaveMode.Append).insertInto(s"$schema.$table")
def getFullDf = df
def getCurrentTime = TimeStamp.getCurrentTime
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我可以得到可以容忍的解决方案。这还不错。谢谢你,新年快乐! =)