打印数据框中不同的列名
printing column names that are different in a dataframe
我有这个数据框
+-----+-------+-----------+-------------------+-----+
|empID|Zipcode|ZipCodeType|City |State|
+-----+-------+-----------+-------------------+-----+
|1000 |704 |STANDARD |PARC PARQUE |PR |
|1000 |704 |STANDARD |PASEO COSTA DEL SUR|PR |
|1001 |709 |STANDARD |BDA SAN LUIS |PR |
|1001 |76166 |UNIQUE |CINGULAR WIRELESS |TX |
|1002 |76177 |STANDARD |FORT WORTH |TX |
|1002 |76177 |STANDARD |FT WORTH |TX |
|1003 |704 |STANDARD |URB EUGENE RICE |PR |
|1003 |85209 |STANDARD |MESA |AZ |
|1004 |85210 |STANDARD |MESA |AZ |
|1004 |32046 |STANDARD |HILLIARD |FL |
+-----+-------+-----------+-------------------+-----+
对于每个 empID 需要打印值不同的列名。
+-----+---------------------------------+
|empID|nonMatchingColumnNames |
+-----+---------------------------------+
|1002 |City |
|1000 |City |
|1001 |State, City, ZipCodeType, Zipcode|
|1003 |State, City, Zipcode |
|1004 |State, City, Zipcode |
+-----+---------------------------------+
我采取的策略是,构建一个结构并收集设置所有值。检查每个集合的计数是否 > 1,然后打印列名。这是我的代码
val schema = new StructType()
.add("empID", IntegerType, true)
.add("Zipcode", StringType, true)
.add("ZipCodeType", StringType, true)
.add("City", StringType, true)
.add("State", StringType, true)
val idColumn = "empID"
val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
.select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(false)
val aggMap = dfJSON.columns
.filterNot(x => x == idColumn)
.map(colName => (collect_set(colName).alias(s"${colName}_asList"), s"${colName}_asList"))
aggMap.foreach(println)
val aggMapColumns = aggMap.map(x => x._1)
val columnsAsList = dfJSON.groupBy(col(idColumn)).agg(aggMapColumns.head, aggMapColumns.tail : _ *)
columnsAsList.show(false)
val combinedDF = columnsAsList.select(col(idColumn), struct(
aggMap.map(x => col(x._2)) : _ * ).alias("combined_struct")
)
combinedDF.printSchema()
combinedDF.show(false)
val columnsToCompare = dfJSON.columns.filterNot(x => x == idColumn).zipWithIndex.map({ case (x,y) => (y,x)})
val output = combinedDF.rdd.map({row => {
val empNo = row.getAs[Int](0)
val conbinedStruct: Row = row.getAs[AnyRef]("combined_struct").asInstanceOf[Row]
val nonMatchingColumns = columnsToCompare.foldLeft(List[String]())((acc, item) => {
val counts = conbinedStruct.getAs[Seq[String]](item._1).length
if (counts == 1) acc else item._2 :: acc
})
(empNo, nonMatchingColumns.mkString(", "))
}}).toDF(idColumn, "nonMatchingColumnNames")
output.show(false)
它在我的本地机器上工作得很好,当我将它移植到 spark-shell(这是一个临时查询)时,当我尝试将数据帧转换为 RDD 时出现空指针异常并遍历结构中的每个项目。
您只能使用 spark 的内置函数来获取包含值不唯一的列列表的字符串:
- 使用
countDistinct
确定特定empID
的特定列中是否有多个值
- 如果非重复计数大于 2,则使用
when
保存列名
- 遍历列并使用
array
将此迭代保存到数组中
- 使用
concat_ws
从这个数组构建一个字符串
完整代码如下:
import org.apache.spark.sql.functions.{array, concat_ws, countDistinct, lit, when}
val output = dfJSON.groupBy("empID").agg(
concat_ws(
", ",
array(dfJSON.columns.filter(_ != "empID").map(c => when(countDistinct(c) > 1, lit(c))): _*)
).as("nonMatchingColumnNames")
)
使用您的输入数据框,您将获得以下输出:
+-----+---------------------------------+
|empID|nonMatchingColumnNames |
+-----+---------------------------------+
|1002 |City |
|1000 |City |
|1001 |Zipcode, ZipCodeType, City, State|
|1003 |Zipcode, City, State |
|1004 |Zipcode, City, State |
+-----+---------------------------------+
我有这个数据框
+-----+-------+-----------+-------------------+-----+
|empID|Zipcode|ZipCodeType|City |State|
+-----+-------+-----------+-------------------+-----+
|1000 |704 |STANDARD |PARC PARQUE |PR |
|1000 |704 |STANDARD |PASEO COSTA DEL SUR|PR |
|1001 |709 |STANDARD |BDA SAN LUIS |PR |
|1001 |76166 |UNIQUE |CINGULAR WIRELESS |TX |
|1002 |76177 |STANDARD |FORT WORTH |TX |
|1002 |76177 |STANDARD |FT WORTH |TX |
|1003 |704 |STANDARD |URB EUGENE RICE |PR |
|1003 |85209 |STANDARD |MESA |AZ |
|1004 |85210 |STANDARD |MESA |AZ |
|1004 |32046 |STANDARD |HILLIARD |FL |
+-----+-------+-----------+-------------------+-----+
对于每个 empID 需要打印值不同的列名。
+-----+---------------------------------+
|empID|nonMatchingColumnNames |
+-----+---------------------------------+
|1002 |City |
|1000 |City |
|1001 |State, City, ZipCodeType, Zipcode|
|1003 |State, City, Zipcode |
|1004 |State, City, Zipcode |
+-----+---------------------------------+
我采取的策略是,构建一个结构并收集设置所有值。检查每个集合的计数是否 > 1,然后打印列名。这是我的代码
val schema = new StructType()
.add("empID", IntegerType, true)
.add("Zipcode", StringType, true)
.add("ZipCodeType", StringType, true)
.add("City", StringType, true)
.add("State", StringType, true)
val idColumn = "empID"
val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
.select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(false)
val aggMap = dfJSON.columns
.filterNot(x => x == idColumn)
.map(colName => (collect_set(colName).alias(s"${colName}_asList"), s"${colName}_asList"))
aggMap.foreach(println)
val aggMapColumns = aggMap.map(x => x._1)
val columnsAsList = dfJSON.groupBy(col(idColumn)).agg(aggMapColumns.head, aggMapColumns.tail : _ *)
columnsAsList.show(false)
val combinedDF = columnsAsList.select(col(idColumn), struct(
aggMap.map(x => col(x._2)) : _ * ).alias("combined_struct")
)
combinedDF.printSchema()
combinedDF.show(false)
val columnsToCompare = dfJSON.columns.filterNot(x => x == idColumn).zipWithIndex.map({ case (x,y) => (y,x)})
val output = combinedDF.rdd.map({row => {
val empNo = row.getAs[Int](0)
val conbinedStruct: Row = row.getAs[AnyRef]("combined_struct").asInstanceOf[Row]
val nonMatchingColumns = columnsToCompare.foldLeft(List[String]())((acc, item) => {
val counts = conbinedStruct.getAs[Seq[String]](item._1).length
if (counts == 1) acc else item._2 :: acc
})
(empNo, nonMatchingColumns.mkString(", "))
}}).toDF(idColumn, "nonMatchingColumnNames")
output.show(false)
它在我的本地机器上工作得很好,当我将它移植到 spark-shell(这是一个临时查询)时,当我尝试将数据帧转换为 RDD 时出现空指针异常并遍历结构中的每个项目。
您只能使用 spark 的内置函数来获取包含值不唯一的列列表的字符串:
- 使用
countDistinct
确定特定empID
的特定列中是否有多个值
- 如果非重复计数大于 2,则使用
when
保存列名
- 遍历列并使用
array
将此迭代保存到数组中
- 使用
concat_ws
从这个数组构建一个字符串
完整代码如下:
import org.apache.spark.sql.functions.{array, concat_ws, countDistinct, lit, when}
val output = dfJSON.groupBy("empID").agg(
concat_ws(
", ",
array(dfJSON.columns.filter(_ != "empID").map(c => when(countDistinct(c) > 1, lit(c))): _*)
).as("nonMatchingColumnNames")
)
使用您的输入数据框,您将获得以下输出:
+-----+---------------------------------+
|empID|nonMatchingColumnNames |
+-----+---------------------------------+
|1002 |City |
|1000 |City |
|1001 |Zipcode, ZipCodeType, City, State|
|1003 |Zipcode, City, State |
|1004 |Zipcode, City, State |
+-----+---------------------------------+