如何对 Scala 数据集执行复杂的操作
how to perform complicated manipulations on scala datasets
我是 scala 的新手,来自 sql 和 pandas 背景,scala 中的数据集对象给我带来了一些麻烦。
我有一个如下所示的数据集...
|car_num| colour|
+-----------+---------+
| 145| c|
| 132| p|
| 104| u|
| 110| c|
| 110| f|
| 113| c|
| 115| c|
| 11| i|
| 117| s|
| 118| a|
我使用案例 class 将其作为数据集加载,如下所示
case class carDS(carNum: String, Colour: String)
每个 car_num 都是独一无二的汽车,许多汽车都有多个条目。颜色栏是指汽车涂漆的颜色。
我想知道如何添加一个列,以给出汽车在没有绿色的情况下所进行的油漆作业总数 (g)。
到目前为止我已经试过了。
carDS
.map(x => (x.carNum, x.Colour))
.groupBy("_1")
.count()
.orderBy($"count".desc).show()
但我相信它只是给我一个汽车喷漆次数的计数栏。这不是最长的汽车连续喷漆次数而不是绿色。
我想我可能需要在我的查询中使用如下函数
def colourrun(sq: String): Int = {
println(sq)
sq.mkString(" ")
.split("g")
.filter(_.nonEmpty)
.map(_.trim)
.map(s => s.split(" ").length)
.max
}
但我不确定它应该放在哪里。
最终如果汽车 102 被涂成 r, b, g, b, o, y, r, g
我希望计数列给出 4 作为答案。
我该怎么做?
谢谢
这是一种方法,它涉及将给定汽车的喷漆作业分组为单调编号的组,这些组由颜色为“g”的喷漆作业分隔,然后是几个 groupBy/agg
表示喷漆作业的最大数量在颜色“g”的油漆作业之间。
(请注意,正在添加 timestamp
列以确保数据集中行的确定性排序。)
val ds = Seq(
("102", "r", 1), ("102", "b", 2), ("102", "g", 3), ("102", "b", 4), ("102", "o", 5), ("102", "y", 6), ("102", "r", 7), ("102", "g", 8),
("145", "c", 1), ("145", "g", 2), ("145", "b", 3), ("145", "r", 4), ("145", "g", 5), ("145", "c", 6), ("145", "g", 7)
).toDF("car_num", "colour", "timestamp").as[(String, String, Long)]
import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy("car_num").orderBy("timestamp")
ds.
withColumn("group", sum(when($"colour" === "g", 1).otherwise(0)).over(win)).
groupBy("car_num", "group").agg(
when($"group" === 0, count("group")).otherwise(count("group") - 1).as("count")
).
groupBy("car_num").agg(max("count").as("max_between_g")).
show
// +-------+-------------+
// |car_num|max_between_g|
// +-------+-------------+
// | 102| 4|
// | 145| 2|
// +-------+-------------+
使用 DataFrame API 的替代方法是将 groupByKey
应用于数据集,然后是 mapGroups
,如下所示:
ds.
map(c => (c.car_num, c.colour)).
groupByKey(_._1).mapGroups{ case (k, iter) =>
val maxTuple = iter.map(_._2).foldLeft((0, 0)){ case ((cnt, mx), c) =>
if (c == "g") (0, math.max(cnt, mx)) else (cnt + 1, mx)
}
(k, maxTuple._2)
}.
show
// +---+---+
// | _1| _2|
// +---+---+
// |102| 4|
// |145| 2|
// +---+---+
我是 scala 的新手,来自 sql 和 pandas 背景,scala 中的数据集对象给我带来了一些麻烦。
我有一个如下所示的数据集...
|car_num| colour|
+-----------+---------+
| 145| c|
| 132| p|
| 104| u|
| 110| c|
| 110| f|
| 113| c|
| 115| c|
| 11| i|
| 117| s|
| 118| a|
我使用案例 class 将其作为数据集加载,如下所示
case class carDS(carNum: String, Colour: String)
每个 car_num 都是独一无二的汽车,许多汽车都有多个条目。颜色栏是指汽车涂漆的颜色。
我想知道如何添加一个列,以给出汽车在没有绿色的情况下所进行的油漆作业总数 (g)。
到目前为止我已经试过了。
carDS
.map(x => (x.carNum, x.Colour))
.groupBy("_1")
.count()
.orderBy($"count".desc).show()
但我相信它只是给我一个汽车喷漆次数的计数栏。这不是最长的汽车连续喷漆次数而不是绿色。
我想我可能需要在我的查询中使用如下函数
def colourrun(sq: String): Int = {
println(sq)
sq.mkString(" ")
.split("g")
.filter(_.nonEmpty)
.map(_.trim)
.map(s => s.split(" ").length)
.max
}
但我不确定它应该放在哪里。
最终如果汽车 102 被涂成 r, b, g, b, o, y, r, g 我希望计数列给出 4 作为答案。
我该怎么做? 谢谢
这是一种方法,它涉及将给定汽车的喷漆作业分组为单调编号的组,这些组由颜色为“g”的喷漆作业分隔,然后是几个 groupBy/agg
表示喷漆作业的最大数量在颜色“g”的油漆作业之间。
(请注意,正在添加 timestamp
列以确保数据集中行的确定性排序。)
val ds = Seq(
("102", "r", 1), ("102", "b", 2), ("102", "g", 3), ("102", "b", 4), ("102", "o", 5), ("102", "y", 6), ("102", "r", 7), ("102", "g", 8),
("145", "c", 1), ("145", "g", 2), ("145", "b", 3), ("145", "r", 4), ("145", "g", 5), ("145", "c", 6), ("145", "g", 7)
).toDF("car_num", "colour", "timestamp").as[(String, String, Long)]
import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy("car_num").orderBy("timestamp")
ds.
withColumn("group", sum(when($"colour" === "g", 1).otherwise(0)).over(win)).
groupBy("car_num", "group").agg(
when($"group" === 0, count("group")).otherwise(count("group") - 1).as("count")
).
groupBy("car_num").agg(max("count").as("max_between_g")).
show
// +-------+-------------+
// |car_num|max_between_g|
// +-------+-------------+
// | 102| 4|
// | 145| 2|
// +-------+-------------+
使用 DataFrame API 的替代方法是将 groupByKey
应用于数据集,然后是 mapGroups
,如下所示:
ds.
map(c => (c.car_num, c.colour)).
groupByKey(_._1).mapGroups{ case (k, iter) =>
val maxTuple = iter.map(_._2).foldLeft((0, 0)){ case ((cnt, mx), c) =>
if (c == "g") (0, math.max(cnt, mx)) else (cnt + 1, mx)
}
(k, maxTuple._2)
}.
show
// +---+---+
// | _1| _2|
// +---+---+
// |102| 4|
// |145| 2|
// +---+---+