DataFrame Spark 的转换

Transformation of a DataFrame Spark

我想使用具有特定类型字段的镶木地板 table:

name_process:字符串id_session:字符串time_write:字符串键:字符串值:字符串

"id_session" 是 SparkSession 的 ID。

table 被“name_process”列分区

例如:

name_process id_session time_write key value
OtherClass sess000001 1639950466114000 schema0.table0.csv Success
OtherClass sess000002 1639950466214000 schema1.table1.csv Success
OtherClass sess000003 1639950466309000 schema0.table0.csv Success
OtherClass sess000003 1639950466310000 schema1.table1.csv Failure
OtherClass sess000003 1639950466311000 schema2.table2.csv Success
OtherClass sess000003 1639950466312000 schema3.table3.csv Success
ExternalClass sess000004 1639950466413000 schema0.table0.csv Success

“key”列的所有值仅在一个 spark 会话(“id_session”列)中是唯一的。发生这种情况是因为我每次启动 spark 会话时都使用相同的文件 (csv)。我打算将这些文件发送到服务器。发送时间和服务器的响应都将记录在“time_write”和“值”列中。 也就是说,我想查看所有csv文件的最新发送状态。

这是我将与之交互的条目的日志。为了与这个日志交互,我想实现几个方法:

所有 getter 方法都将 return 过滤包含所有列的 DateFrames。也就是说,结果仍然是 5 列。 我在使用 API Spark 时仍然遇到困难。我需要一些时间才能学会如何在 DataFrames 上执行漂亮的操作。 这是我的结果:

abstract class ProcessResultBook(processName: String, onlyPartition: Boolean = true)(implicit spark: SparkSession) {

  val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
  val path      = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
  val df        = spark.read.parquet(path)


  def getLastSession: Dataset[Row] = {
    val lastTime        = df.select(max(col("time_write"))).collect()(0)(0).toString
    val lastSession     = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
    val dfByLastSession = df.filter(col("id_session") === lastSession)

    dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session|      time_write|               key|  value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|

*/
    dfByLastSession
  }

  def add(df: DataFrame) = ???
  def add(processName: String, idSession: String, timeWrite: String, key: String, value: String) = ???
  def getSessionsByProcess(processName: String) = ???
  def getBySessionAndProcess(processName: String, idSession: String) = ???
  def getUnique(processName: String) = ???
  def delByTime(time: String) = ???
  def delByIdSession(idSession: String) = ???

  def getCurrentTime: SQLTimestamp    = DateTimeUtils.fromMillis(TimeStamp.getCurrentTime.getTime)
  def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}

我有案例 class:

case class RowProcessResult(
                              nameProcess: String,
                              idSession: String,
                              timeWrite: String,
                              key: String,
                              value: String
                           )

帮助实现2个方法:

方法add(..) 已在hive table.

中添加数据收集

方法getUnique(nameProcess: String): DataFrame。 Returns 一个 DataFrame,其中包含“键”列的唯一值的所有列。对于每个唯一的“关键”值,选择最近的日期。

PS.: 我的测试 class 创建 Hive Table:

def createHiveTable(implicit spark: SparkSession) {

  val schema = "test_schema"
  val table = "test_table"
  val partitionName = "name_process"
  val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil

  spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
  //val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"

  val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"

  val df = spark.read.option("delimiter", ",")
    .option("header", true)
    .csv(path)

  df.show()

  df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")

}

好久不见了。我留下我的决定。

import spark.implicits._

  val schema = "test_db"
  val table  = "test_table"
  val df     = spark.read.table(s"$schema.$table").filter(col("name_process") === processName).persist


  def getLastSession: Dataset[Row] = {

    val lastSessionId   = df.select(max(struct(col("time_write"), col("id_session")))("id_session"))
                            .first.getString(0)
    val dfByLastSession = df.filter(col("id_session") === lastSessionId)
    dfByLastSession.show()
    dfByLastSession
  }

  def add(listRows: Seq[RowProcessResult]) = {

    val df = listRows.toDF().withColumn("name_process", lit(processName))
    df.show()
    addDfToTable(df)

  }

  def add(nameProcess: String, idSession: String, timeWrite: String, key: String, value: String) = {
    val df = RowProcessResult(idSession, timeWrite, key, value) :: Nil toDF()
    addDfToTable(df)
  }

  def getSessionsByProcess(externalProcessName: String) = {
    spark.read.table(s"$schema.$table").filter(col("name_process") === externalProcessName)
  }

  def getSession(idSession: String, processName: String = this.processName) = {
    if (processName.equals(this.processName))
      df.filter(col("id_session") === idSession)
    else
      getSessionsByProcess(processName).filter(col("id_session") === idSession)
  }

  def getUnique = df.sort(col("time_write").desc).dropDuplicates("key")

  def addDfToTable(df: DataFrame) =
    df.write.mode(SaveMode.Append).insertInto(s"$schema.$table")

  def getFullDf = df
  def getCurrentTime = TimeStamp.getCurrentTime
  def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}

我可以得到可以容忍的解决方案。这还不错。谢谢你,新年快乐! =)