迭代 Spark 1.6 中的分组数据集

Iterating over a grouped dataset in Spark 1.6

在有序数据集中,我想聚合数据直到满足条件,但按特定键分组。

为了为我的问题设置一些上下文,我将我的问题简化为以下问题陈述:

In spark I need to aggregate strings, grouped by key when a user stops "shouting" (the 2nd char in a string is not uppercase).

数据集示例:

ID, text, timestamps

1, "OMG I like bananas", 123
1, "Bananas are the best", 234
1, "MAN I love banana", 1235
2, "ORLY? I'm more into grapes", 123565
2, "BUT I like apples too", 999
2, "unless you count veggies", 9999
2, "THEN don't forget tomatoes", 999999

预期结果为:

1, "OMG I like bananas Bananas are the best"
2, "ORLY? I'm more into grapes BUT I like apples too unless you count veggies"

通过 groupby 和 agg 我似乎无法设置条件 "stop when an uppercase char" 被发现。

这只适用于 Spark 2.1 或更高版本

你想做的事是可以的,但可能会很贵。

首先,让我们创建一些测试数据。作为一般建议,当您在 Whosebug 上提问时,请提供与此类似的内容,以便人们可以从某个地方开始。

import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._

val df = List(
    (1,  "OMG I like bananas", 1),
    (1, "Bananas are the best", 2),
    (1, "MAN I love banana", 3),
    (2, "ORLY? I'm more into grapes", 1),
    (2, "BUT I like apples too", 2),
    (2, "unless you count veggies", 3),
    (2, "THEN don't forget tomatoes", 4)
).toDF("ID", "text", "timestamps")

为了获得包含按顺序收集的文本的列,我们需要使用 window 函数添加一个新列。

使用火花 shell:

scala> val df2 = df.withColumn("coll", collect_list("text").over(Window.partitionBy("id").orderBy("timestamps")))
df2: org.apache.spark.sql.DataFrame = [ID: int, text: string ... 2 more fields]

scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]

scala> x.collect.foreach(println)
[1,WrappedArray(OMG I like bananas, Bananas are the best, MAN I love banana)]
[2,WrappedArray(ORLY? I'm more into grapes, BUT I like apples too, unless you count veggies, THEN don't forget tomatoes)]

要获取实际文本,我们可能需要一个 UDF。这是我的(我远不是 Scala 专家,所以请耐心等待)

import scala.collection.mutable

val aggText: Seq[String] => String = (list: Seq[String]) => {
    def tex(arr: Seq[String], accum: Seq[String]): Seq[String] = arr match {
        case Seq() => accum
        case Seq(single) => accum :+ single
        case Seq(str, xs @_*) => if (str.length >= 2 && !(str.charAt(0).isUpper && str.charAt(1).isUpper))
            tex(Nil, accum :+ str )
        else
            tex(xs, accum :+ str)
    }

    val res = tex(list, Seq())
    res.mkString(" ")
}

val textUDF = udf(aggText(_: mutable.WrappedArray[String]))

因此,我们有一个数据框,其中包含按正确顺序收集的文本,以及一个 Scala 函数(包装为 UDF)。让我们拼凑一下:

scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]

scala> val y = x.select($"ID", textUDF($"texts"))
y: org.apache.spark.sql.DataFrame = [ID: int, UDF(texts): string]

scala> y.collect.foreach(println)
[1,OMG I like bananas Bananas are the best]
[2,ORLY? I'm more into grapes BUT I like apples too unless you count veggies]

scala>

我想这就是你想要的结果。