在 Spark Scala 中转换非数字 table
Pivot non-numeric table in Spark Scala
是否可以在 Spark Scala 中使用非数字值来转换 table?我已经复习了以下两个 Stack 问题。
按照 "List in the Case-When" 问题中的步骤,我可以转换我的数据,以便每个数据类型都是一列,但每个实体-数据类型组合都有一行。
id tag value
1 US foo
1 UK bar
1 CA baz
2 US hoo
2 UK hah
2 CA wah
id US UK CA
1 foo
1 bar
1 baz
2 hoo
2 hah
3 wah
是否有 "first non-null" 函数可以将每个实体的多行合并为一行?
id US UK CA
1 foo bar baz
2 hoo hah wah
您可以考虑aggregate
方法(或aggregateByKey
)。您只需要编写适当的函数来获取每个位置的非空元素。
这是一个完整的 Scala class,它创建了一个示例数据框,然后将其旋转。它是专门针对这个问题的,所以我不知道它会有多普遍的用处。也没有经过广泛测试,所以买家要小心。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object DemoPivot {
def main(args: Array[String]) = {
def pivotColumn(df: DataFrame)(t: String): Column = {
val col = when(df("tag") <=> lit(t), df("value"))
col.alias(t)
}
def pivotFrame(sqlContext: SQLContext, df: DataFrame): DataFrame = {
val tags = df.select("tag").distinct.map(r => r.getString(0)).collect.toList
df.select(df("id") :: tags.map(pivotColumn(df)): _*)
}
def aggregateRows(value: Seq[Option[Any]], agg: Seq[Option[Any]]): Seq[Option[Any]] = {
for (i <- 0 until Math.max(value.size, agg.size)) yield i match {
case x if x > value.size => agg(x)
case y if y > agg.size => value(y)
case z if value(z).isEmpty => agg(z)
case a => value(a)
}
}
def collapseRows(sqlContext: SQLContext, df: DataFrame): DataFrame = {
// RDDs cannot have null elements, so pack into Options and unpack before returning
val rdd = df.map(row => (Some(row(0)), row.toSeq.tail.map(element => if (element == null) None else Some(element))))
val agg = rdd.reduceByKey(aggregateRows)
val aggRdd = agg.map{ case (key, list) => Row.fromSeq((key.get) :: (list.map(element => if (element.isDefined) element.get else null)).toList) }
sqlContext.createDataFrame(aggRdd, df.schema)
}
val conf = new SparkConf().setAppName("Simple Pivot Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data = List((1, "US", "foo"), (1, "UK", "bar"), (1, "CA", "baz"),
(2, "US", "hoo"), (2, "UK", "hah"), (2, "CA", "wah"))
val rows = data.map(d => Row.fromSeq(d.productIterator.toList))
val fields = Array(StructField("id", IntegerType, nullable = false),
StructField("tag", StringType, nullable = false),
StructField("value", StringType, nullable = false))
val df = sqlContext.createDataFrame(sc.parallelize(rows), StructType(fields))
df.show()
val pivoted = pivotFrame(sqlContext, df)
pivoted.show()
val collapsed = collapseRows(sqlContext, pivoted)
collapsed.show()
}
}
是否可以在 Spark Scala 中使用非数字值来转换 table?我已经复习了以下两个 Stack 问题。
按照 "List in the Case-When" 问题中的步骤,我可以转换我的数据,以便每个数据类型都是一列,但每个实体-数据类型组合都有一行。
id tag value
1 US foo
1 UK bar
1 CA baz
2 US hoo
2 UK hah
2 CA wah
id US UK CA
1 foo
1 bar
1 baz
2 hoo
2 hah
3 wah
是否有 "first non-null" 函数可以将每个实体的多行合并为一行?
id US UK CA
1 foo bar baz
2 hoo hah wah
您可以考虑aggregate
方法(或aggregateByKey
)。您只需要编写适当的函数来获取每个位置的非空元素。
这是一个完整的 Scala class,它创建了一个示例数据框,然后将其旋转。它是专门针对这个问题的,所以我不知道它会有多普遍的用处。也没有经过广泛测试,所以买家要小心。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object DemoPivot {
def main(args: Array[String]) = {
def pivotColumn(df: DataFrame)(t: String): Column = {
val col = when(df("tag") <=> lit(t), df("value"))
col.alias(t)
}
def pivotFrame(sqlContext: SQLContext, df: DataFrame): DataFrame = {
val tags = df.select("tag").distinct.map(r => r.getString(0)).collect.toList
df.select(df("id") :: tags.map(pivotColumn(df)): _*)
}
def aggregateRows(value: Seq[Option[Any]], agg: Seq[Option[Any]]): Seq[Option[Any]] = {
for (i <- 0 until Math.max(value.size, agg.size)) yield i match {
case x if x > value.size => agg(x)
case y if y > agg.size => value(y)
case z if value(z).isEmpty => agg(z)
case a => value(a)
}
}
def collapseRows(sqlContext: SQLContext, df: DataFrame): DataFrame = {
// RDDs cannot have null elements, so pack into Options and unpack before returning
val rdd = df.map(row => (Some(row(0)), row.toSeq.tail.map(element => if (element == null) None else Some(element))))
val agg = rdd.reduceByKey(aggregateRows)
val aggRdd = agg.map{ case (key, list) => Row.fromSeq((key.get) :: (list.map(element => if (element.isDefined) element.get else null)).toList) }
sqlContext.createDataFrame(aggRdd, df.schema)
}
val conf = new SparkConf().setAppName("Simple Pivot Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data = List((1, "US", "foo"), (1, "UK", "bar"), (1, "CA", "baz"),
(2, "US", "hoo"), (2, "UK", "hah"), (2, "CA", "wah"))
val rows = data.map(d => Row.fromSeq(d.productIterator.toList))
val fields = Array(StructField("id", IntegerType, nullable = false),
StructField("tag", StringType, nullable = false),
StructField("value", StringType, nullable = false))
val df = sqlContext.createDataFrame(sc.parallelize(rows), StructType(fields))
df.show()
val pivoted = pivotFrame(sqlContext, df)
pivoted.show()
val collapsed = collapseRows(sqlContext, pivoted)
collapsed.show()
}
}