JDBCRDD 或 RDD 中的聚合
Aggregations in JDBCRDD or RDD
我是 Sacla 和 Spark 的新手,我正在尝试使用 jdbcRDD 使用 Spark 在 SqlServer 上创建一个 SQL 查询,并使用映射和聚合对其进行一些转换。
这就是我所拥有的,一个包含 n 个字符串列和 m 个数字列的 Table。
喜欢
"A", "A1",1,2
"A", "A1",4,3
"A", "A2",3,4
"B", "B1",6,7
...
...
我正在寻找的是创建一个层次结构结构来对字符串进行分组并聚合数字列,例如
A
|->A1
|->(5,5)
|->A2
|->(3,4)
B
|->B1
|->(6,7)
我能够创建层次结构,但无法对数值列表执行聚合。
如果您通过 JDBC 加载数据,我会简单地使用 DataFrames:
import sqlContext.implicits._
import org.apache.spark.sql.functions.sum
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
val options: Map[(String, String)] = ???
val df: DataFrame = sqlContext.read
.format("jdbc")
.options(options)
.load()
.toDF("k1", "k2", "v1", "v2")
df.printSchema
// root
// |-- k1: string (nullable = true)
// |-- k2: string (nullable = true)
// |-- v1: integer (nullable = true)
// |-- v2: integer (nullable = true)
df.show
// +---+---+---+---+
// | k1| k2| v1| v2|
// +---+---+---+---+
// | A| A1| 1| 2|
// | A| A1| 4| 3|
// | A| A2| 3| 4|
// | B| B1| 6| 7|
// +---+---+---+---+
有了上面的输入,你所需要的只是一个基本的聚合
df
.groupBy($"k1", $"k2")
.agg(sum($"v1").alias("v1"), sum($"v2").alias("v2")).show
// +---+---+---+---+
// | k1| k2| v1| v2|
// +---+---+---+---+
// | A| A1| 5| 5|
// | A| A2| 3| 4|
// | B| B1| 6| 7|
// +---+---+---+---+
如果你有这样的 RDD:
val rdd RDD[(String, String, Int, Int)] = ???
rdd.first
// (String, String, Int, Int) = (A,A1,1,2)
没有理由建立复杂的层次结构。简单的 PairRDD
就足够了:
val aggregated: RDD[((String, String), breeze.linalg.Vector[Int])] = rdd
.map{case (k1, k2, v1, v2) => ((k1, k2), breeze.linalg.Vector(v1, v2))}
.reduceByKey(_ + _)
aggregated.first
// ((String, String), breeze.linalg.Vector[Int]) = ((A,A2),DenseVector(3, 4))
保持层次结构是无效的,但您可以像这样在 RDD
之上分组:
aggregated.map{case ((k1, k2), v) => (k1, (k2, v))}.groupByKey
我是 Sacla 和 Spark 的新手,我正在尝试使用 jdbcRDD 使用 Spark 在 SqlServer 上创建一个 SQL 查询,并使用映射和聚合对其进行一些转换。 这就是我所拥有的,一个包含 n 个字符串列和 m 个数字列的 Table。 喜欢
"A", "A1",1,2
"A", "A1",4,3
"A", "A2",3,4
"B", "B1",6,7
...
...
我正在寻找的是创建一个层次结构结构来对字符串进行分组并聚合数字列,例如
A
|->A1
|->(5,5)
|->A2
|->(3,4)
B
|->B1
|->(6,7)
我能够创建层次结构,但无法对数值列表执行聚合。
如果您通过 JDBC 加载数据,我会简单地使用 DataFrames:
import sqlContext.implicits._
import org.apache.spark.sql.functions.sum
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
val options: Map[(String, String)] = ???
val df: DataFrame = sqlContext.read
.format("jdbc")
.options(options)
.load()
.toDF("k1", "k2", "v1", "v2")
df.printSchema
// root
// |-- k1: string (nullable = true)
// |-- k2: string (nullable = true)
// |-- v1: integer (nullable = true)
// |-- v2: integer (nullable = true)
df.show
// +---+---+---+---+
// | k1| k2| v1| v2|
// +---+---+---+---+
// | A| A1| 1| 2|
// | A| A1| 4| 3|
// | A| A2| 3| 4|
// | B| B1| 6| 7|
// +---+---+---+---+
有了上面的输入,你所需要的只是一个基本的聚合
df
.groupBy($"k1", $"k2")
.agg(sum($"v1").alias("v1"), sum($"v2").alias("v2")).show
// +---+---+---+---+
// | k1| k2| v1| v2|
// +---+---+---+---+
// | A| A1| 5| 5|
// | A| A2| 3| 4|
// | B| B1| 6| 7|
// +---+---+---+---+
如果你有这样的 RDD:
val rdd RDD[(String, String, Int, Int)] = ???
rdd.first
// (String, String, Int, Int) = (A,A1,1,2)
没有理由建立复杂的层次结构。简单的 PairRDD
就足够了:
val aggregated: RDD[((String, String), breeze.linalg.Vector[Int])] = rdd
.map{case (k1, k2, v1, v2) => ((k1, k2), breeze.linalg.Vector(v1, v2))}
.reduceByKey(_ + _)
aggregated.first
// ((String, String), breeze.linalg.Vector[Int]) = ((A,A2),DenseVector(3, 4))
保持层次结构是无效的,但您可以像这样在 RDD
之上分组:
aggregated.map{case ((k1, k2), v) => (k1, (k2, v))}.groupByKey