从 RDD 中的 2 个值添加一个新的计算列

Add a new calculated column from 2 values in RDD

我有 2 个配对的 RDD,我使用相同的键将它们连接在一起,现在我想使用值部分的 2 列添加一个新的计算列。新加入的RDD类型为:

RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]

我想向新的 RDD 添加另一个字段,它显示 2 DateTime 字段之间的增量。

我该怎么做?

你应该可以使用map将二元组扩展为三元组,大致如下:

joined.map{ case (key, values) =>
  val delta = computeDelta(values)
  (key, values, delta)
}

或者,更简洁:

joined.map{ case (k, vs) => (k, vs, computeDelta(vs)) }

然后您的 computeDelta 函数可以只提取类型 (String, DateTime, Int,Int) 的第一个和第二个值,从每个值中获取第二个项目 (DateTime) 并使用任何 DateTime 功能很方便。

如果你想让你的输出RDD仍然是成对的RDD,那么你就需要将新的delta字段包装成一个元组,大致如下:

joined.mapValues{ values =>
  val delta = computeDelta(values)
  (values, delta)
}

这将保留原始的 PairedRDD 键,并为您提供 (Iterable[(String, DateTime, Int,Int)], Long)

类型的值

(假设您正在计算 Long 类型的增量)