根据另一个 RDD 的值聚合一个 RDD Spark (Java)

Aggregating one RDD according to value of another RDD Spark (Java)

我有两个包含时间信息的RDD。 RDD 被分割成不同的分区。 一种是

形式
16:00:00
16:00:18
16:00:25
16:01:01
16:01:34
16:02:12
16:02:42
...

和另一个包含时间跨度的元组 2

<16:00:00, 16:00:59>
<16:01:00, 16:01:59>
<16:02:00, 16:02:59>
...

我需要聚合第一个和第二个 RDD,根据第二个中的值聚合第一个的值,以获得类似

的东西
<<16:00:00, 16:00:59>, [16:00:00,16:00:18,16:00:25]>
<<16:01:00, 16:01:59>, [16:01:01,16:01:34]>
<<16:02:00, 16:02:59>, [16:02:12,16:02:42]>
...

或者,换句话说,

<<16:00:00, 16:00:59>, 16:00:00>
<<16:00:00, 16:00:59>, 16:00:18>
<<16:00:00, 16:00:59>, 16:00:25>
<<16:01:00, 16:01:59>, 16:01:01>
<<16:01:00, 16:01:59>, 16:01:34>
<<16:02:00, 16:02:59>, 16:02:12>
<<16:02:00, 16:02:59>, 16:02:42>
...

我正在尝试使用整个范围的 spark 转换函数,但我很难找到一个适用于如此不同性质的 RDD 的函数。我知道我可能会选择 cartesian 产品,然后过滤,但我想要 "better" 解决方案。我试过 zipPartition,这可能有效,但我的分区可能不一致,例如16:00:00 可能最终出现在相应聚合值(元组 <16:00:00, 16:00:59>)不存在的分区中。 处理此问题的最佳方法是什么?

PS:我正在使用 Java,但也欢迎使用 Scala 解决方案。 谢谢

我已将下面的内容简化为使用整数,但我相信同样的事情也可以完成。虽然示例是在 Scala 中,但我怀疑它也可以在 Java 中完成。

如果范围是规则的,我会把 "values" RDD 变成 range,value 然后做一个简单的连接。

val values = Seq(1, 5, 10, 14, 20)
val valuesRdd = sc.parallelize(values, 2)
valuesRdd.map(x => (((x/10)*10, ((x/10)*10)+9), x)).collect

但是,如果范围不规则,则:

如果您不介意使用 DataFrame,那么一个选项是使用 用户定义的函数 根据给定范围内的 V in 创建一个列并加入那。

case class Range(low : Int, high :Int)
val ranges = Seq( Range(0,9), Range(10,19), Range(20,29));
val rangesDf = sc.parallelize(ranges, 2).toDF

case class Value(value : Int)
val values = Seq(Value(1), Value(5), Value(10), Value(14), Value(20))
val valuesDf = sc.parallelize(values, 2).toDF

val inRange = udf{(v: Int, low: Int, high : Int) => v >= low && v<= high}

rangesDf.join(valuesDf, inRange(valuesDf("value"), rangesDf("low"), rangesDf("high"))).show

下一个选择是 展开 超出范围并加入分解版本:

val explodedRange = rangesRdd.map(x => (x, List.range(x._1, x._2 + 1))).flatMap( { case (range, lst) => lst.map { x => (x, range)} })
val valuesRdd = sc.parallelize(values, 2).map(x => (x,true))
valuesRdd.join(explodedRange).map(x => (x._2._2, x._1)).collect