Spark Scala 按某个值范围拆分 DataFrame
Spark Scala Split DataFrame by some value range
假设我有一个数据框,其中包含一个名为 x
的列,其值范围为 [0, 1]
。我希望将它按 x
列的值拆分,范围如 [0, 0.1)
、[0.1, 0.2)
...[0.9, 1]
。有没有一个好的和快速的方法来做到这一点?我在 Scala 中使用 Spark 2。
更新:理想情况下应该有 10 个包含每个范围数据的新数据框。
如果您打算离散化双类型列,您可以这样做(将该列乘以 10,然后将其转换为整数类型,该列将被分成 10 个离散的 bin):
import org.apache.spark.sql.types.IntegerType
val df = Seq(0.32, 0.5, 0.99, 0.72, 0.11, 0.03).toDF("A")
// df: org.apache.spark.sql.DataFrame = [A: double]
df.withColumn("new", ($"A" * 10).cast(IntegerType)).show
+----+---+
| A|new|
+----+---+
|0.32| 3|
| 0.5| 5|
|0.99| 9|
|0.72| 7|
|0.11| 1|
|0.03| 0|
+----+---+
扩展@Psidom 创建范围的解决方案,这是为每个范围创建数据框的一种方法:
import org.apache.spark.sql.types.IntegerType
val df = Seq(0.2, 0.71, 0.95, 0.33, 0.28, 0.8, 0.73).toDF("x")
val df2 = df.withColumn("g", ($"x" * 10.0).cast(IntegerType))
df2.show
+----+---+
| x| g|
+----+---+
| 0.2| 2|
|0.71| 7|
|0.95| 9|
|0.33| 3|
|0.28| 2|
| 0.8| 8|
|0.73| 7|
+----+---+
val dfMap = df2.select($"g").distinct.
collect.
flatMap(_.toSeq).
map( g => g -> df2.where($"g" === g) ).
toMap
dfMap.getOrElse(3, null).show
+----+---+
| x| g|
+----+---+
|0.33| 3|
+----+---+
dfMap.getOrElse(7, null).show
+----+---+
| x| g|
+----+---+
|0.71| 7|
|0.73| 7|
+----+---+
[更新]
如果您的范围不规则,您可以定义一个函数,将 Double 映射到相应的 Int 范围 id,然后用 UDF
包裹它,如下所示:
val g: Double => Int = x => x match {
case x if (x >= 0.0 && x < 0.12345) => 1
case x if (x >= 0.12345 && x < 0.4834) => 2
case x if (x >= 0.4834 && x < 1.0) => 3
case _ => 99 // catch-all
}
val groupUDF = udf(g)
val df = Seq(0.1, 0.2, 0.71, 0.95, 0.03, 0.09, 0.44, 5.0).toDF("x")
val df2 = df.withColumn("g", groupUDF($"x"))
df2.show
+----+---+
| x| g|
+----+---+
| 0.1| 1|
| 0.2| 2|
|0.71| 3|
|0.95| 3|
|0.03| 1|
|0.09| 1|
|0.44| 2|
| 5.0| 99|
+----+---+
假设我有一个数据框,其中包含一个名为 x
的列,其值范围为 [0, 1]
。我希望将它按 x
列的值拆分,范围如 [0, 0.1)
、[0.1, 0.2)
...[0.9, 1]
。有没有一个好的和快速的方法来做到这一点?我在 Scala 中使用 Spark 2。
更新:理想情况下应该有 10 个包含每个范围数据的新数据框。
如果您打算离散化双类型列,您可以这样做(将该列乘以 10,然后将其转换为整数类型,该列将被分成 10 个离散的 bin):
import org.apache.spark.sql.types.IntegerType
val df = Seq(0.32, 0.5, 0.99, 0.72, 0.11, 0.03).toDF("A")
// df: org.apache.spark.sql.DataFrame = [A: double]
df.withColumn("new", ($"A" * 10).cast(IntegerType)).show
+----+---+
| A|new|
+----+---+
|0.32| 3|
| 0.5| 5|
|0.99| 9|
|0.72| 7|
|0.11| 1|
|0.03| 0|
+----+---+
扩展@Psidom 创建范围的解决方案,这是为每个范围创建数据框的一种方法:
import org.apache.spark.sql.types.IntegerType
val df = Seq(0.2, 0.71, 0.95, 0.33, 0.28, 0.8, 0.73).toDF("x")
val df2 = df.withColumn("g", ($"x" * 10.0).cast(IntegerType))
df2.show
+----+---+
| x| g|
+----+---+
| 0.2| 2|
|0.71| 7|
|0.95| 9|
|0.33| 3|
|0.28| 2|
| 0.8| 8|
|0.73| 7|
+----+---+
val dfMap = df2.select($"g").distinct.
collect.
flatMap(_.toSeq).
map( g => g -> df2.where($"g" === g) ).
toMap
dfMap.getOrElse(3, null).show
+----+---+
| x| g|
+----+---+
|0.33| 3|
+----+---+
dfMap.getOrElse(7, null).show
+----+---+
| x| g|
+----+---+
|0.71| 7|
|0.73| 7|
+----+---+
[更新]
如果您的范围不规则,您可以定义一个函数,将 Double 映射到相应的 Int 范围 id,然后用 UDF
包裹它,如下所示:
val g: Double => Int = x => x match {
case x if (x >= 0.0 && x < 0.12345) => 1
case x if (x >= 0.12345 && x < 0.4834) => 2
case x if (x >= 0.4834 && x < 1.0) => 3
case _ => 99 // catch-all
}
val groupUDF = udf(g)
val df = Seq(0.1, 0.2, 0.71, 0.95, 0.03, 0.09, 0.44, 5.0).toDF("x")
val df2 = df.withColumn("g", groupUDF($"x"))
df2.show
+----+---+
| x| g|
+----+---+
| 0.1| 1|
| 0.2| 2|
|0.71| 3|
|0.95| 3|
|0.03| 1|
|0.09| 1|
|0.44| 2|
| 5.0| 99|
+----+---+