如何在 scala 中创建镶木地板 table?
How create parquet table in scala?
我想用特定类型的字段创建镶木地板 table:
name_process: 字符串
id_session: 整数
time_write: LocalDate 或时间戳
键:字符串
值:字符串
name_process
id_session
time_write
key
value
OtherClass
jsdfsadfsf
43434883477
schema0.table0.csv
Success
OtherClass
jksdfkjhka
23212123323
schema1.table1.csv
Success
OtherClass
alskdfksjd
23343212234
schema2.table2.csv
Failure
ExternalClass
sdfjkhsdfd
34455453434
schema3.table3.csv
Success
我要写这样一个table正确。使用正确的数据类型。然后我要从中读取分区。我正在尝试实现读写。但到目前为止,结果很糟糕。
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
def getLastSession(processName: String)(implicit spark: SparkSession): Unit = {
val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
.select(
col("name_process").cast(StringType),
col("id_session").cast(StringType),
col("time_write").cast(LongType),
col("key").cast(StringType),
col("value").cast(StringType)
)
val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)
println(lastSession)
println(TimeStamp.getCurrentTime)
}
来自 spark 的日志:
21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
+- Relation[id_session#78,time_write#79,key#80,value#81] parquet
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis.applyOrElse(CheckAnalysis.scala:155)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis.applyOrElse(CheckAnalysis.scala:152)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
问题
当你这样做时
spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
您正在从特定目录读取,这就是缺少 name_process
列的原因。
解法:
您可以执行以下操作
spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)
我想用特定类型的字段创建镶木地板 table:
name_process: 字符串 id_session: 整数 time_write: LocalDate 或时间戳 键:字符串 值:字符串
name_process | id_session | time_write | key | value |
---|---|---|---|---|
OtherClass | jsdfsadfsf | 43434883477 | schema0.table0.csv | Success |
OtherClass | jksdfkjhka | 23212123323 | schema1.table1.csv | Success |
OtherClass | alskdfksjd | 23343212234 | schema2.table2.csv | Failure |
ExternalClass | sdfjkhsdfd | 34455453434 | schema3.table3.csv | Success |
我要写这样一个table正确。使用正确的数据类型。然后我要从中读取分区。我正在尝试实现读写。但到目前为止,结果很糟糕。
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
def getLastSession(processName: String)(implicit spark: SparkSession): Unit = {
val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
.select(
col("name_process").cast(StringType),
col("id_session").cast(StringType),
col("time_write").cast(LongType),
col("key").cast(StringType),
col("value").cast(StringType)
)
val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)
println(lastSession)
println(TimeStamp.getCurrentTime)
}
来自 spark 的日志:
21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
+- Relation[id_session#78,time_write#79,key#80,value#81] parquet
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis.applyOrElse(CheckAnalysis.scala:155)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis.applyOrElse(CheckAnalysis.scala:152)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
问题
当你这样做时
spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
您正在从特定目录读取,这就是缺少 name_process
列的原因。
解法:
您可以执行以下操作
spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)