如何检测 Spark 中 CSV 文件模式的变化
How to detect change in CSV file schema in Spark
如果传入的 CSV 文件中的架构发生变化,我们如何在 spark 中处理它?
假设在第 1 天,
我得到了一个包含架构和数据的 csv 文件,
FirstName LastName Age
Sagar Patro 26
Akash Nayak 22
Amar Kumar 18
在第 10 天,
我传入的 CSV 文件架构发生了变化,如下所示
FirstName LastName Mobile Age
Sagar Patro 8984159475 26
Akash Nayak 9040988503 22
Amar Kumar 9337856871 18
我的需求一,
我想知道传入的 CSV 文件的架构是否有任何变化。
我的要求2,
我想忽略那些新添加的列并继续使用我之前的模式,即第 1 天模式数据。
我的要求3,
如果传入的 csv 数据的模式发生变化,我也想自动添加新模式,即第 10 天模式
import org.apache.spark.sql.DataFrame
object SchemaDiff {
def main(args: Array[String]): Unit = {
// Just because its a simple CSV not considering column data type changes
val df1 : DataFrame = null // Dataframe for yesterday's data
val df2 : DataFrame = null // Dataframe for today's data
val deltaColumnNames = df2.columns.diff(df1.columns)
val ignoreSchemaChange = true
if(!deltaColumnNames.isEmpty) {
println("Schema change")
}
val resultDf = if(ignoreSchemaChange) {
df2.toDF(df1.columns: _*) // Maintain yesterday's schema
} else {
df2 // Use updated schema
}
}
}
如果传入的 CSV 文件中的架构发生变化,我们如何在 spark 中处理它?
假设在第 1 天, 我得到了一个包含架构和数据的 csv 文件,
FirstName LastName Age
Sagar Patro 26
Akash Nayak 22
Amar Kumar 18
在第 10 天, 我传入的 CSV 文件架构发生了变化,如下所示
FirstName LastName Mobile Age
Sagar Patro 8984159475 26
Akash Nayak 9040988503 22
Amar Kumar 9337856871 18
我的需求一,
我想知道传入的 CSV 文件的架构是否有任何变化。
我的要求2,
我想忽略那些新添加的列并继续使用我之前的模式,即第 1 天模式数据。
我的要求3,
如果传入的 csv 数据的模式发生变化,我也想自动添加新模式,即第 10 天模式
import org.apache.spark.sql.DataFrame
object SchemaDiff {
def main(args: Array[String]): Unit = {
// Just because its a simple CSV not considering column data type changes
val df1 : DataFrame = null // Dataframe for yesterday's data
val df2 : DataFrame = null // Dataframe for today's data
val deltaColumnNames = df2.columns.diff(df1.columns)
val ignoreSchemaChange = true
if(!deltaColumnNames.isEmpty) {
println("Schema change")
}
val resultDf = if(ignoreSchemaChange) {
df2.toDF(df1.columns: _*) // Maintain yesterday's schema
} else {
df2 // Use updated schema
}
}
}