Spark:scala rdd中的group concat等价物
Spark: group concat equivalent in scala rdd
我有以下数据框:
|-----id-------|----value------|-----desc------|
| 1 | v1 | d1 |
| 1 | v2 | d2 |
| 2 | v21 | d21 |
| 2 | v22 | d22 |
|--------------|---------------|---------------|
我想将其转化为:
|-----id-------|----value------|-----desc------|
| 1 | v1;v2 | d1;d2 |
| 2 | v21;v22 | d21;d22 |
|--------------|---------------|---------------|
- 是否可以通过数据框操作?
- 在这种情况下,rdd 转换会是什么样子?
我认为 rdd.reduce 是关键,但我不知道如何使其适应这种情况。
假设你有类似的东西
import scala.util.Random
val sqlc: SQLContext = ???
case class Record(id: Long, value: String, desc: String)
val testData = for {
(i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5))
} yield Record(i, s"v$i$j", s"d$i$j")
val df = sqlc.createDataFrame(testData)
您可以轻松加入数据:
import sqlc.implicits._
def aggConcat(col: String) = df
.map(row => (row.getAs[Long]("id"), row.getAs[String](col)))
.aggregateByKey(Vector[String]())(_ :+ _, _ ++ _)
val result = aggConcat("value").zip(aggConcat("desc")).map{
case ((id, value), (_, desc)) => (id, value, desc)
}.toDF("id", "values", "descs")
如果您想连接字符串而不是数组,可以运行稍后
import org.apache.spark.sql.functions._
val resultConcat = result
.withColumn("values", concat_ws(";", $"values"))
.withColumn("descs" , concat_ws(";", $"descs" ))
经过一番研究,我想到了这样的事情:
val data = sc.parallelize(
List(
("1", "v1", "d1"),
("1", "v2", "d2"),
("2", "v21", "d21"),
("2", "v22", "d22")))
.map{ case(id, value, desc)=>((id), (value, desc))}
.reduceByKey((x,y)=>(x._1+";"+y._1, x._2+";"+x._2))
.map{ case(id,(value, desc))=>(id, value, desc)}.toDF("id", "value","desc")
.show()
那给我留下了:
+---+-------+-------+
| id| value| desc|
+---+-------+-------+
| 1| v1;v2| d1;d1|
| 2|v21;v22|d21;d21|
+---+-------+-------+
您可以使用 spark 转换数据 sql
case class Test(id: Int, value: String, desc: String)
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22")))
.map(line => Test(line._1, line._2, line._3))
.df
data.registerTempTable("data")
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
result.show
如果使用 DataFrame,请使用 UDAF
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil)
def dataType: DataType = StringType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""
private def concatStrings(str1: String, str2: String): String = {
(str1, str2) match {
case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep)
case (null, s: String) => s
case (s: String, null) => s
case _ => ""
}
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val acc1 = buffer.getAs[String](0)
val acc2 = input.getAs[String](0)
buffer(0) = concatStrings(acc1, acc2)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val acc1 = buffer1.getAs[String](0)
val acc2 = buffer2.getAs[String](0)
buffer1(0) = concatStrings(acc1, acc2)
}
def evaluate(buffer: Row): Any = buffer.getAs[String](0)
}
然后这样使用
val stringConcatener = new ConcatStringsUDAF("Category_ID", ",")
data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs"))
从 Spark 1.6 开始,查看数据集和聚合器。
我有以下数据框:
|-----id-------|----value------|-----desc------|
| 1 | v1 | d1 |
| 1 | v2 | d2 |
| 2 | v21 | d21 |
| 2 | v22 | d22 |
|--------------|---------------|---------------|
我想将其转化为:
|-----id-------|----value------|-----desc------|
| 1 | v1;v2 | d1;d2 |
| 2 | v21;v22 | d21;d22 |
|--------------|---------------|---------------|
- 是否可以通过数据框操作?
- 在这种情况下,rdd 转换会是什么样子?
我认为 rdd.reduce 是关键,但我不知道如何使其适应这种情况。
假设你有类似的东西
import scala.util.Random
val sqlc: SQLContext = ???
case class Record(id: Long, value: String, desc: String)
val testData = for {
(i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5))
} yield Record(i, s"v$i$j", s"d$i$j")
val df = sqlc.createDataFrame(testData)
您可以轻松加入数据:
import sqlc.implicits._
def aggConcat(col: String) = df
.map(row => (row.getAs[Long]("id"), row.getAs[String](col)))
.aggregateByKey(Vector[String]())(_ :+ _, _ ++ _)
val result = aggConcat("value").zip(aggConcat("desc")).map{
case ((id, value), (_, desc)) => (id, value, desc)
}.toDF("id", "values", "descs")
如果您想连接字符串而不是数组,可以运行稍后
import org.apache.spark.sql.functions._
val resultConcat = result
.withColumn("values", concat_ws(";", $"values"))
.withColumn("descs" , concat_ws(";", $"descs" ))
经过一番研究,我想到了这样的事情:
val data = sc.parallelize(
List(
("1", "v1", "d1"),
("1", "v2", "d2"),
("2", "v21", "d21"),
("2", "v22", "d22")))
.map{ case(id, value, desc)=>((id), (value, desc))}
.reduceByKey((x,y)=>(x._1+";"+y._1, x._2+";"+x._2))
.map{ case(id,(value, desc))=>(id, value, desc)}.toDF("id", "value","desc")
.show()
那给我留下了:
+---+-------+-------+
| id| value| desc|
+---+-------+-------+
| 1| v1;v2| d1;d1|
| 2|v21;v22|d21;d21|
+---+-------+-------+
您可以使用 spark 转换数据 sql
case class Test(id: Int, value: String, desc: String)
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22")))
.map(line => Test(line._1, line._2, line._3))
.df
data.registerTempTable("data")
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
result.show
如果使用 DataFrame,请使用 UDAF
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil)
def dataType: DataType = StringType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""
private def concatStrings(str1: String, str2: String): String = {
(str1, str2) match {
case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep)
case (null, s: String) => s
case (s: String, null) => s
case _ => ""
}
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val acc1 = buffer.getAs[String](0)
val acc2 = input.getAs[String](0)
buffer(0) = concatStrings(acc1, acc2)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val acc1 = buffer1.getAs[String](0)
val acc2 = buffer2.getAs[String](0)
buffer1(0) = concatStrings(acc1, acc2)
}
def evaluate(buffer: Row): Any = buffer.getAs[String](0)
}
然后这样使用
val stringConcatener = new ConcatStringsUDAF("Category_ID", ",")
data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs"))
从 Spark 1.6 开始,查看数据集和聚合器。