基于列值分区后将 Bucketizer 应用于 Spark 数据帧
Applying Bucketizer to Spark dataframe after partitioning based on a column value
我需要在下面的数据帧 df
上应用 spark bucketizer。这是模型数据。原始数据框有大约 10k 条记录。
instance name value percentage
A37 Histogram.ratio 1 0.20
A37 Histogram.ratio 20 0.34
A37 Histogram.ratio 50 0.04
A37 Histogram.ratio 500 0.13
A37 Histogram.ratio 2000 0.05
A37 Histogram.ratio 9000 0.32
A49 Histogram.ratio 1 0.50
A49 Histogram.ratio 20 0.24
A49 Histogram.ratio 25 0.09
A49 Histogram.ratio 55 0.12
A49 Histogram.ratio 120 0.06
A49 Histogram.ratio 300 0.08
我需要在按列 instance
对数据帧进行分区后应用 bucketizer。 instance
中的每个值都有不同的拆分数组,定义如下
val splits_map = Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))
我将使用以下代码对单列执行分桶。但需要帮助按 instance
列对数据帧进行分区,然后应用 bucketizer.transform
val bucketizer = new Bucketizer().setInputCol("value").setOutputCol("value_range").setSplits(splits)
val df2 = bucketizer.transform(df)
df2.groupBy("value_range").sum("percentage").show()
是否可以将 dataFrame 拆分为多个列值为 instance
的 dataFrame,然后对 value
列进行分桶,然后使用 groupBy().sum() 计算百分比总和。
预期输出:
instance name bucket percentage
A37 Histogram.ratio 0 0.54
A37 Histogram.ratio 1 0.17
A37 Histogram.ratio 3 0.05
A37 Histogram.ratio 4 0.32
A49 Histogram.ratio 0 0.50
A49 Histogram.ratio 1 0.33
A49 Histogram.ratio 2 0.12
A49 Histogram.ratio 3 0.14
将分区内的数据分桶的替代方法:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
def bucketizeWithinPartition(df: DataFrame, splits: Map[String, Array[Int]], partitionCol: String, featureCol: String): DataFrame = {
val window = Window.partitionBy(partitionCol).orderBy($"bucket_start")
val splitsDf = splits.toList.toDF(partitionCol, "splits")
.withColumn("bucket_start", explode($"splits"))
.withColumn("bucket_end", coalesce(lead($"bucket_start", 1).over(window), lit(Int.MaxValue)))
.withColumn("bucket", row_number().over(window))
val joinCond = "d.%s = s.%s AND d.%s >= s.bucket_start AND d.%s < bucket_end".format(partitionCol, partitionCol, featureCol, featureCol)
df.as("d")
.join(splitsDf.as("s"), expr(joinCond), "inner")
.select($"d.*", $"s.bucket")
}
val data =
List(
("A37", "Histogram.ratio", 1, 0.20),
("A37", "Histogram.ratio", 20, 0.34),
("A37", "Histogram.ratio", 9000, 0.32),
("A49", "Histogram.ratio", 1, 0.50),
("A49", "Histogram.ratio", 20, 0.24)
).toDF("instance", "name", "value", "percentage")
val splits_map = Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))
val bucketedData = bucketizeWithinPartition(data, splits_map, "instance", "value")
我需要在下面的数据帧 df
上应用 spark bucketizer。这是模型数据。原始数据框有大约 10k 条记录。
instance name value percentage
A37 Histogram.ratio 1 0.20
A37 Histogram.ratio 20 0.34
A37 Histogram.ratio 50 0.04
A37 Histogram.ratio 500 0.13
A37 Histogram.ratio 2000 0.05
A37 Histogram.ratio 9000 0.32
A49 Histogram.ratio 1 0.50
A49 Histogram.ratio 20 0.24
A49 Histogram.ratio 25 0.09
A49 Histogram.ratio 55 0.12
A49 Histogram.ratio 120 0.06
A49 Histogram.ratio 300 0.08
我需要在按列 instance
对数据帧进行分区后应用 bucketizer。 instance
中的每个值都有不同的拆分数组,定义如下
val splits_map = Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))
我将使用以下代码对单列执行分桶。但需要帮助按 instance
列对数据帧进行分区,然后应用 bucketizer.transform
val bucketizer = new Bucketizer().setInputCol("value").setOutputCol("value_range").setSplits(splits)
val df2 = bucketizer.transform(df)
df2.groupBy("value_range").sum("percentage").show()
是否可以将 dataFrame 拆分为多个列值为 instance
的 dataFrame,然后对 value
列进行分桶,然后使用 groupBy().sum() 计算百分比总和。
预期输出:
instance name bucket percentage
A37 Histogram.ratio 0 0.54
A37 Histogram.ratio 1 0.17
A37 Histogram.ratio 3 0.05
A37 Histogram.ratio 4 0.32
A49 Histogram.ratio 0 0.50
A49 Histogram.ratio 1 0.33
A49 Histogram.ratio 2 0.12
A49 Histogram.ratio 3 0.14
将分区内的数据分桶的替代方法:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
def bucketizeWithinPartition(df: DataFrame, splits: Map[String, Array[Int]], partitionCol: String, featureCol: String): DataFrame = {
val window = Window.partitionBy(partitionCol).orderBy($"bucket_start")
val splitsDf = splits.toList.toDF(partitionCol, "splits")
.withColumn("bucket_start", explode($"splits"))
.withColumn("bucket_end", coalesce(lead($"bucket_start", 1).over(window), lit(Int.MaxValue)))
.withColumn("bucket", row_number().over(window))
val joinCond = "d.%s = s.%s AND d.%s >= s.bucket_start AND d.%s < bucket_end".format(partitionCol, partitionCol, featureCol, featureCol)
df.as("d")
.join(splitsDf.as("s"), expr(joinCond), "inner")
.select($"d.*", $"s.bucket")
}
val data =
List(
("A37", "Histogram.ratio", 1, 0.20),
("A37", "Histogram.ratio", 20, 0.34),
("A37", "Histogram.ratio", 9000, 0.32),
("A49", "Histogram.ratio", 1, 0.50),
("A49", "Histogram.ratio", 20, 0.24)
).toDF("instance", "name", "value", "percentage")
val splits_map = Map("A37" -> Array(0,30,1000,5000,9000), "A49" -> Array(0,10,30,80,998))
val bucketedData = bucketizeWithinPartition(data, splits_map, "instance", "value")