如何在 scala 中创建镶木地板 table?

How create parquet table in scala?

我想用特定类型的字段创建镶木地板 table:

name_process: 字符串 id_session: 整数 time_write: LocalDate 或时间戳 键:字符串 值:字符串

name_process id_session time_write key value
OtherClass jsdfsadfsf 43434883477 schema0.table0.csv Success
OtherClass jksdfkjhka 23212123323 schema1.table1.csv Success
OtherClass alskdfksjd 23343212234 schema2.table2.csv Failure
ExternalClass sdfjkhsdfd 34455453434 schema3.table3.csv Success

我要写这样一个table正确。使用正确的数据类型。然后我要从中读取分区。我正在尝试实现读写。但到目前为止,结果很糟糕。

def createHiveTable(implicit spark: SparkSession) {

  val schema = "test_schema"
  val table = "test_table"
  val partitionName = "name_process"
  val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil

  spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
  //val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"

  val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"

  val df = spark.read.option("delimiter", ",")
    .option("header", true)
    .csv(path)

  df.show()

  df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")

}

def getLastSession(processName: String)(implicit spark: SparkSession): Unit = {

  val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
                     .select(
                              col("name_process").cast(StringType),
                              col("id_session").cast(StringType),
                              col("time_write").cast(LongType),
                              col("key").cast(StringType),
                              col("value").cast(StringType)
                     )

  val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
  val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)

  println(lastSession)
  println(TimeStamp.getCurrentTime)
}

来自 spark 的日志:

21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
 
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
+- Relation[id_session#78,time_write#79,key#80,value#81] parquet
 
 
                at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis.applyOrElse(CheckAnalysis.scala:155)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis.applyOrElse(CheckAnalysis.scala:152)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:342)
                at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
                at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:408)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
                at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:408)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)

问题

当你这样做时

spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")

您正在从特定目录读取,这就是缺少 name_process 列的原因。

解法:

您可以执行以下操作

spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)