如何检查 Hive 中的损坏记录 table
How to check for corrupt records in Hive table
我有一个配置单元 table,每天都会增加数据。在某一天,一些损坏的记录被插入到 table。有没有一种方法可以将 table 与 HDFS 上的主文件相匹配,并从 Hive
中提取损坏的记录
或
如何识别具有 100 万行的配置单元 table 中的损坏记录?
使用 join, except1
找出加载到 Hive table vs 文件中的损坏记录。
Example:
//read the file
val df=spark.read.<format>("<path>")
//read hive table
val df1=spark.read.table("<db>.<hive_table_name>")
//without using md5 hash
df.exceptAll(df1).show()
df1.exceptAll(df).show()
//create md5 hash by concatenating all column values
val df2=df.withColumn("md_hash",md5(concat_ws(",",df.columns.map(c => col(c)): _*))).select("md_hash")
val df3=df1.withColumn("md_hash",md5(concat_ws(",",df.columns.map(c => col(c)): _*))).select("md_hash")
//get non matching rows from df2 that are not existed in df3
df2.except(df3).show()
df2.exceptAll(df3).show()
//get non matching rows from df3 that are not existed in df2
df3.exceptA(df2).show()
df3.exceptAll(df2).show()
//or using full outer join
df3.join(df2,df3("md_hash") === df2("md_hash"),"full").
filter((df2("md_hash").isNull || df3("md_hash").isNull)).
show(10,false)
我有一个配置单元 table,每天都会增加数据。在某一天,一些损坏的记录被插入到 table。有没有一种方法可以将 table 与 HDFS 上的主文件相匹配,并从 Hive
中提取损坏的记录或
如何识别具有 100 万行的配置单元 table 中的损坏记录?
使用 join, except1
找出加载到 Hive table vs 文件中的损坏记录。
Example:
//read the file
val df=spark.read.<format>("<path>")
//read hive table
val df1=spark.read.table("<db>.<hive_table_name>")
//without using md5 hash
df.exceptAll(df1).show()
df1.exceptAll(df).show()
//create md5 hash by concatenating all column values
val df2=df.withColumn("md_hash",md5(concat_ws(",",df.columns.map(c => col(c)): _*))).select("md_hash")
val df3=df1.withColumn("md_hash",md5(concat_ws(",",df.columns.map(c => col(c)): _*))).select("md_hash")
//get non matching rows from df2 that are not existed in df3
df2.except(df3).show()
df2.exceptAll(df3).show()
//get non matching rows from df3 that are not existed in df2
df3.exceptA(df2).show()
df3.exceptAll(df2).show()
//or using full outer join
df3.join(df2,df3("md_hash") === df2("md_hash"),"full").
filter((df2("md_hash").isNull || df3("md_hash").isNull)).
show(10,false)