总结scala中每个序列的连续值
Summing up consecutive values for each sequence in scala
我有一个数据集,其中序列号为 0 和 1。
Category Value Sequences
1 10 0
1 11 1
1 13 1
1 16 1
1 20 0
1 21 0
1 22 1
1 25 1
1 27 1
1 29 1
1 30 0
1 32 1
1 34 1
1 35 1
1 38 0
此处 1's in sequences 列出现了三次。我需要单独总结那个序列值。
我正在使用以下代码进行尝试:
%livy2.spark
import org.apache.spark.rdd.RDD
val df = df.select( $"Category", $"Value", $"Sequences").rdd.groupBy(x =>
(x.getInt(0))
).map(
x => {
val Category= x(0).getInt(0)
val Value= x(0).getInt(1)
val Sequences = x(0).getInt(2)
for (i <- x.indices){
val vi = x(i).getFloat(4)
if (vi(0) >0 )
{
summing+ = Value//
}
(Category, summing)
}
}
)
df_new.take(10).foreach(println)
当我写这段代码时出现错误,说明语句不完整。
df 值代表我最初给的数据集。
预期输出为:
Category summing
1 40
1 103
1 101
我不知道我落后在哪里。如果有人能帮助我学习这个新东西,那就太好了。
可以通过为每一行分配唯一 ID,然后将每个单元包含在由下一个零唯一 ID 指定的组中来完成:
val df = Seq(
(1, 10, 0),
(1, 11, 1),
(1, 13, 1),
(1, 16, 1),
(1, 20, 0),
(1, 21, 0),
(1, 22, 1),
(1, 25, 1),
(1, 27, 1),
(1, 29, 1),
(1, 30, 0),
(1, 32, 1),
(1, 34, 1),
(1, 35, 1),
(1, 38, 0)
).toDF("Category", "Value", "Sequences")
// assign each row unique id
val zipped = df.withColumn("zip", monotonically_increasing_id())
// Make range from zero to next zero
val categoryWindow = Window.partitionBy("Category").orderBy($"zip")
val groups = zipped
.filter($"Sequences" === 0)
.withColumn("rangeEnd", lead($"zip", 1).over(categoryWindow))
.withColumnRenamed("zip", "rangeStart")
println("Groups:")
groups.show(false)
// Assign range for each unit
val joinCondition = ($"units.zip" > $"groups.rangeStart").and($"units.zip" < $"groups.rangeEnd")
val unitsByRange = zipped
.filter($"Sequences" === 1).alias("units")
.join(groups.alias("groups"), joinCondition, "left")
.select("units.Category", "units.Value", "groups.rangeStart")
println("Units in groups:")
unitsByRange.show(false)
// Group by range
val result = unitsByRange
.groupBy($"Category", $"rangeStart")
.agg(sum("Value").alias("summing"))
.orderBy("rangeStart")
.drop("rangeStart")
println("Result:")
result.show(false)
输出:
Groups:
+--------+-----+---------+----------+----------+
|Category|Value|Sequences|rangeStart|rangeEnd |
+--------+-----+---------+----------+----------+
|1 |10 |0 |0 |4 |
|1 |20 |0 |4 |5 |
|1 |21 |0 |5 |8589934595|
|1 |30 |0 |8589934595|8589934599|
|1 |38 |0 |8589934599|null |
+--------+-----+---------+----------+----------+
Units in groups:
+--------+-----+----------+
|Category|Value|rangeStart|
+--------+-----+----------+
|1 |11 |0 |
|1 |13 |0 |
|1 |16 |0 |
|1 |22 |5 |
|1 |25 |5 |
|1 |27 |5 |
|1 |29 |5 |
|1 |32 |8589934595|
|1 |34 |8589934595|
|1 |35 |8589934595|
+--------+-----+----------+
Result:
+--------+-------+
|Category|summing|
+--------+-------+
|1 |40 |
|1 |103 |
|1 |101 |
+--------+-------+
我有一个数据集,其中序列号为 0 和 1。
Category Value Sequences
1 10 0
1 11 1
1 13 1
1 16 1
1 20 0
1 21 0
1 22 1
1 25 1
1 27 1
1 29 1
1 30 0
1 32 1
1 34 1
1 35 1
1 38 0
此处 1's in sequences 列出现了三次。我需要单独总结那个序列值。
我正在使用以下代码进行尝试:
%livy2.spark
import org.apache.spark.rdd.RDD
val df = df.select( $"Category", $"Value", $"Sequences").rdd.groupBy(x =>
(x.getInt(0))
).map(
x => {
val Category= x(0).getInt(0)
val Value= x(0).getInt(1)
val Sequences = x(0).getInt(2)
for (i <- x.indices){
val vi = x(i).getFloat(4)
if (vi(0) >0 )
{
summing+ = Value//
}
(Category, summing)
}
}
)
df_new.take(10).foreach(println)
当我写这段代码时出现错误,说明语句不完整。 df 值代表我最初给的数据集。
预期输出为:
Category summing
1 40
1 103
1 101
我不知道我落后在哪里。如果有人能帮助我学习这个新东西,那就太好了。
可以通过为每一行分配唯一 ID,然后将每个单元包含在由下一个零唯一 ID 指定的组中来完成:
val df = Seq(
(1, 10, 0),
(1, 11, 1),
(1, 13, 1),
(1, 16, 1),
(1, 20, 0),
(1, 21, 0),
(1, 22, 1),
(1, 25, 1),
(1, 27, 1),
(1, 29, 1),
(1, 30, 0),
(1, 32, 1),
(1, 34, 1),
(1, 35, 1),
(1, 38, 0)
).toDF("Category", "Value", "Sequences")
// assign each row unique id
val zipped = df.withColumn("zip", monotonically_increasing_id())
// Make range from zero to next zero
val categoryWindow = Window.partitionBy("Category").orderBy($"zip")
val groups = zipped
.filter($"Sequences" === 0)
.withColumn("rangeEnd", lead($"zip", 1).over(categoryWindow))
.withColumnRenamed("zip", "rangeStart")
println("Groups:")
groups.show(false)
// Assign range for each unit
val joinCondition = ($"units.zip" > $"groups.rangeStart").and($"units.zip" < $"groups.rangeEnd")
val unitsByRange = zipped
.filter($"Sequences" === 1).alias("units")
.join(groups.alias("groups"), joinCondition, "left")
.select("units.Category", "units.Value", "groups.rangeStart")
println("Units in groups:")
unitsByRange.show(false)
// Group by range
val result = unitsByRange
.groupBy($"Category", $"rangeStart")
.agg(sum("Value").alias("summing"))
.orderBy("rangeStart")
.drop("rangeStart")
println("Result:")
result.show(false)
输出:
Groups:
+--------+-----+---------+----------+----------+
|Category|Value|Sequences|rangeStart|rangeEnd |
+--------+-----+---------+----------+----------+
|1 |10 |0 |0 |4 |
|1 |20 |0 |4 |5 |
|1 |21 |0 |5 |8589934595|
|1 |30 |0 |8589934595|8589934599|
|1 |38 |0 |8589934599|null |
+--------+-----+---------+----------+----------+
Units in groups:
+--------+-----+----------+
|Category|Value|rangeStart|
+--------+-----+----------+
|1 |11 |0 |
|1 |13 |0 |
|1 |16 |0 |
|1 |22 |5 |
|1 |25 |5 |
|1 |27 |5 |
|1 |29 |5 |
|1 |32 |8589934595|
|1 |34 |8589934595|
|1 |35 |8589934595|
+--------+-----+----------+
Result:
+--------+-------+
|Category|summing|
+--------+-------+
|1 |40 |
|1 |103 |
|1 |101 |
+--------+-------+