在 spark 中删除不遵循模式的行
Drop rows in spark which dont follow schema
目前,我的 table 的架构是:
root
|-- product_id: integer (nullable = true)
|-- product_name: string (nullable = true)
|-- aisle_id: string (nullable = true)
|-- department_id: string (nullable = true)
我想在上面应用下面的模式 table 并删除所有不遵循下面模式的行:
val productsSchema = StructType(Seq(
StructField("product_id",IntegerType,nullable = true),
StructField("product_name",StringType,nullable = true),
StructField("aisle_id",IntegerType,nullable = true),
StructField("department_id",IntegerType,nullable = true)
))
如果数据与架构不匹配,spark 会将 null
作为该列中的值。我们只需要过滤所有列的空值。
使用 filter
过滤所有列的“null”值。
scala> "cat /tmp/sample.json".! // JSON File Data, one row is not matching with schema.
{"product_id":1,"product_name":"sampleA","aisle_id":"AA","department_id":"AAD"}
{"product_id":2,"product_name":"sampleBB","aisle_id":"AAB","department_id":"AADB"}
{"product_id":3,"product_name":"sampleCC","aisle_id":"CC","department_id":"CCC"}
{"product_id":3,"product_name":"sampledd","aisle_id":"dd","departmentId":"ddd"}
{"name","srinivas","age":29}
res100: Int = 0
scala> schema.printTreeString
root
|-- aisle_id: string (nullable = true)
|-- department_id: string (nullable = true)
|-- product_id: long (nullable = true)
|-- product_name: string (nullable = true)
scala> val df = spark.read.schema(schema).option("badRecordsPath", "/tmp/badRecordsPath").format("json").load("/tmp/sample.json") // Loading Json data & if schema is not matching we will be getting null rows for all columns.
df: org.apache.spark.sql.DataFrame = [aisle_id: string, department_id: string ... 2 more fields]
scala> df.show(false)
+--------+-------------+----------+------------+
|aisle_id|department_id|product_id|product_name|
+--------+-------------+----------+------------+
|AA |AAD |1 |sampleA |
|AAB |AADB |2 |sampleBB |
|CC |CCC |3 |sampleCC |
|dd |null |3 |sampledd |
|null |null |null |null |
+--------+-------------+----------+------------+
scala> df.filter(df.columns.map(c => s"${c} is not null").mkString(" or ")).show(false) // Filter null rows.
+--------+-------------+----------+------------+
|aisle_id|department_id|product_id|product_name|
+--------+-------------+----------+------------+
|AA |AAD |1 |sampleA |
|AAB |AADB |2 |sampleBB |
|CC |CCC |3 |sampleCC |
|dd |null |3 |sampledd |
+--------+-------------+----------+------------+
scala>
请查看 data-frame
上的 na.drop
函数,您可以根据空值、一行中的最小空值以及包含空值的特定列来删除行。
scala> sc.parallelize(Seq((1,"a","a"),(1,"a","a"),(2,"b","b"),(3,"c","c"),(4,"d","d"),(4,"d",null))).toDF
res7: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 1 more field]
scala> res7.show()
+---+---+----+
| _1| _2| _3|
+---+---+----+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
| 4| d|null|
+---+---+----+
//dropping row if a null is found
scala> res7.na.drop.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
+---+---+---+
//drops only if `minNonNulls = 3` if accepted to each row
scala> res7.na.drop(minNonNulls = 3).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
+---+---+---+
//not dropping any
scala> res7.na.drop(minNonNulls = 2).show()
+---+---+----+
| _1| _2| _3|
+---+---+----+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
| 4| d|null|
+---+---+----+
//drops row based on nulls in `_3` column
scala> res7.na.drop(Seq("_3")).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
+---+---+---+
加载忽略损坏记录的数据时使用选项 "DROPMALFORMED"。
spark.read.format("json")
.option("mode", "DROPMALFORMED")
.option("header", "true")
.schema(productsSchema)
.load("sample.json")
目前,我的 table 的架构是:
root
|-- product_id: integer (nullable = true)
|-- product_name: string (nullable = true)
|-- aisle_id: string (nullable = true)
|-- department_id: string (nullable = true)
我想在上面应用下面的模式 table 并删除所有不遵循下面模式的行:
val productsSchema = StructType(Seq(
StructField("product_id",IntegerType,nullable = true),
StructField("product_name",StringType,nullable = true),
StructField("aisle_id",IntegerType,nullable = true),
StructField("department_id",IntegerType,nullable = true)
))
如果数据与架构不匹配,spark 会将 null
作为该列中的值。我们只需要过滤所有列的空值。
使用 filter
过滤所有列的“null”值。
scala> "cat /tmp/sample.json".! // JSON File Data, one row is not matching with schema.
{"product_id":1,"product_name":"sampleA","aisle_id":"AA","department_id":"AAD"}
{"product_id":2,"product_name":"sampleBB","aisle_id":"AAB","department_id":"AADB"}
{"product_id":3,"product_name":"sampleCC","aisle_id":"CC","department_id":"CCC"}
{"product_id":3,"product_name":"sampledd","aisle_id":"dd","departmentId":"ddd"}
{"name","srinivas","age":29}
res100: Int = 0
scala> schema.printTreeString
root
|-- aisle_id: string (nullable = true)
|-- department_id: string (nullable = true)
|-- product_id: long (nullable = true)
|-- product_name: string (nullable = true)
scala> val df = spark.read.schema(schema).option("badRecordsPath", "/tmp/badRecordsPath").format("json").load("/tmp/sample.json") // Loading Json data & if schema is not matching we will be getting null rows for all columns.
df: org.apache.spark.sql.DataFrame = [aisle_id: string, department_id: string ... 2 more fields]
scala> df.show(false)
+--------+-------------+----------+------------+
|aisle_id|department_id|product_id|product_name|
+--------+-------------+----------+------------+
|AA |AAD |1 |sampleA |
|AAB |AADB |2 |sampleBB |
|CC |CCC |3 |sampleCC |
|dd |null |3 |sampledd |
|null |null |null |null |
+--------+-------------+----------+------------+
scala> df.filter(df.columns.map(c => s"${c} is not null").mkString(" or ")).show(false) // Filter null rows.
+--------+-------------+----------+------------+
|aisle_id|department_id|product_id|product_name|
+--------+-------------+----------+------------+
|AA |AAD |1 |sampleA |
|AAB |AADB |2 |sampleBB |
|CC |CCC |3 |sampleCC |
|dd |null |3 |sampledd |
+--------+-------------+----------+------------+
scala>
请查看 data-frame
上的 na.drop
函数,您可以根据空值、一行中的最小空值以及包含空值的特定列来删除行。
scala> sc.parallelize(Seq((1,"a","a"),(1,"a","a"),(2,"b","b"),(3,"c","c"),(4,"d","d"),(4,"d",null))).toDF
res7: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 1 more field]
scala> res7.show()
+---+---+----+
| _1| _2| _3|
+---+---+----+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
| 4| d|null|
+---+---+----+
//dropping row if a null is found
scala> res7.na.drop.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
+---+---+---+
//drops only if `minNonNulls = 3` if accepted to each row
scala> res7.na.drop(minNonNulls = 3).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
+---+---+---+
//not dropping any
scala> res7.na.drop(minNonNulls = 2).show()
+---+---+----+
| _1| _2| _3|
+---+---+----+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
| 4| d|null|
+---+---+----+
//drops row based on nulls in `_3` column
scala> res7.na.drop(Seq("_3")).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| a| a|
| 1| a| a|
| 2| b| b|
| 3| c| c|
| 4| d| d|
+---+---+---+
加载忽略损坏记录的数据时使用选项 "DROPMALFORMED"。
spark.read.format("json")
.option("mode", "DROPMALFORMED")
.option("header", "true")
.schema(productsSchema)
.load("sample.json")