迭代 Spark 1.6 中的分组数据集
Iterating over a grouped dataset in Spark 1.6
在有序数据集中,我想聚合数据直到满足条件,但按特定键分组。
为了为我的问题设置一些上下文,我将我的问题简化为以下问题陈述:
In spark I need to aggregate strings, grouped by key when a user stops
"shouting" (the 2nd char in a string is not uppercase).
数据集示例:
ID, text, timestamps
1, "OMG I like bananas", 123
1, "Bananas are the best", 234
1, "MAN I love banana", 1235
2, "ORLY? I'm more into grapes", 123565
2, "BUT I like apples too", 999
2, "unless you count veggies", 9999
2, "THEN don't forget tomatoes", 999999
预期结果为:
1, "OMG I like bananas Bananas are the best"
2, "ORLY? I'm more into grapes BUT I like apples too unless you count veggies"
通过 groupby 和 agg 我似乎无法设置条件 "stop when an uppercase char" 被发现。
这只适用于 Spark 2.1 或更高版本
你想做的事是可以的,但可能会很贵。
首先,让我们创建一些测试数据。作为一般建议,当您在 Whosebug 上提问时,请提供与此类似的内容,以便人们可以从某个地方开始。
import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._
val df = List(
(1, "OMG I like bananas", 1),
(1, "Bananas are the best", 2),
(1, "MAN I love banana", 3),
(2, "ORLY? I'm more into grapes", 1),
(2, "BUT I like apples too", 2),
(2, "unless you count veggies", 3),
(2, "THEN don't forget tomatoes", 4)
).toDF("ID", "text", "timestamps")
为了获得包含按顺序收集的文本的列,我们需要使用 window 函数添加一个新列。
使用火花 shell:
scala> val df2 = df.withColumn("coll", collect_list("text").over(Window.partitionBy("id").orderBy("timestamps")))
df2: org.apache.spark.sql.DataFrame = [ID: int, text: string ... 2 more fields]
scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]
scala> x.collect.foreach(println)
[1,WrappedArray(OMG I like bananas, Bananas are the best, MAN I love banana)]
[2,WrappedArray(ORLY? I'm more into grapes, BUT I like apples too, unless you count veggies, THEN don't forget tomatoes)]
要获取实际文本,我们可能需要一个 UDF。这是我的(我远不是 Scala 专家,所以请耐心等待)
import scala.collection.mutable
val aggText: Seq[String] => String = (list: Seq[String]) => {
def tex(arr: Seq[String], accum: Seq[String]): Seq[String] = arr match {
case Seq() => accum
case Seq(single) => accum :+ single
case Seq(str, xs @_*) => if (str.length >= 2 && !(str.charAt(0).isUpper && str.charAt(1).isUpper))
tex(Nil, accum :+ str )
else
tex(xs, accum :+ str)
}
val res = tex(list, Seq())
res.mkString(" ")
}
val textUDF = udf(aggText(_: mutable.WrappedArray[String]))
因此,我们有一个数据框,其中包含按正确顺序收集的文本,以及一个 Scala 函数(包装为 UDF)。让我们拼凑一下:
scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]
scala> val y = x.select($"ID", textUDF($"texts"))
y: org.apache.spark.sql.DataFrame = [ID: int, UDF(texts): string]
scala> y.collect.foreach(println)
[1,OMG I like bananas Bananas are the best]
[2,ORLY? I'm more into grapes BUT I like apples too unless you count veggies]
scala>
我想这就是你想要的结果。
在有序数据集中,我想聚合数据直到满足条件,但按特定键分组。
为了为我的问题设置一些上下文,我将我的问题简化为以下问题陈述:
In spark I need to aggregate strings, grouped by key when a user stops "shouting" (the 2nd char in a string is not uppercase).
数据集示例:
ID, text, timestamps
1, "OMG I like bananas", 123
1, "Bananas are the best", 234
1, "MAN I love banana", 1235
2, "ORLY? I'm more into grapes", 123565
2, "BUT I like apples too", 999
2, "unless you count veggies", 9999
2, "THEN don't forget tomatoes", 999999
预期结果为:
1, "OMG I like bananas Bananas are the best"
2, "ORLY? I'm more into grapes BUT I like apples too unless you count veggies"
通过 groupby 和 agg 我似乎无法设置条件 "stop when an uppercase char" 被发现。
这只适用于 Spark 2.1 或更高版本
你想做的事是可以的,但可能会很贵。
首先,让我们创建一些测试数据。作为一般建议,当您在 Whosebug 上提问时,请提供与此类似的内容,以便人们可以从某个地方开始。
import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._
val df = List(
(1, "OMG I like bananas", 1),
(1, "Bananas are the best", 2),
(1, "MAN I love banana", 3),
(2, "ORLY? I'm more into grapes", 1),
(2, "BUT I like apples too", 2),
(2, "unless you count veggies", 3),
(2, "THEN don't forget tomatoes", 4)
).toDF("ID", "text", "timestamps")
为了获得包含按顺序收集的文本的列,我们需要使用 window 函数添加一个新列。
使用火花 shell:
scala> val df2 = df.withColumn("coll", collect_list("text").over(Window.partitionBy("id").orderBy("timestamps")))
df2: org.apache.spark.sql.DataFrame = [ID: int, text: string ... 2 more fields]
scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]
scala> x.collect.foreach(println)
[1,WrappedArray(OMG I like bananas, Bananas are the best, MAN I love banana)]
[2,WrappedArray(ORLY? I'm more into grapes, BUT I like apples too, unless you count veggies, THEN don't forget tomatoes)]
要获取实际文本,我们可能需要一个 UDF。这是我的(我远不是 Scala 专家,所以请耐心等待)
import scala.collection.mutable
val aggText: Seq[String] => String = (list: Seq[String]) => {
def tex(arr: Seq[String], accum: Seq[String]): Seq[String] = arr match {
case Seq() => accum
case Seq(single) => accum :+ single
case Seq(str, xs @_*) => if (str.length >= 2 && !(str.charAt(0).isUpper && str.charAt(1).isUpper))
tex(Nil, accum :+ str )
else
tex(xs, accum :+ str)
}
val res = tex(list, Seq())
res.mkString(" ")
}
val textUDF = udf(aggText(_: mutable.WrappedArray[String]))
因此,我们有一个数据框,其中包含按正确顺序收集的文本,以及一个 Scala 函数(包装为 UDF)。让我们拼凑一下:
scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]
scala> val y = x.select($"ID", textUDF($"texts"))
y: org.apache.spark.sql.DataFrame = [ID: int, UDF(texts): string]
scala> y.collect.foreach(println)
[1,OMG I like bananas Bananas are the best]
[2,ORLY? I'm more into grapes BUT I like apples too unless you count veggies]
scala>
我想这就是你想要的结果。