从 RDD 中的 2 个值添加一个新的计算列
Add a new calculated column from 2 values in RDD
我有 2 个配对的 RDD,我使用相同的键将它们连接在一起,现在我想使用值部分的 2 列添加一个新的计算列。新加入的RDD类型为:
RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]
我想向新的 RDD 添加另一个字段,它显示 2 DateTime
字段之间的增量。
我该怎么做?
你应该可以使用map
将二元组扩展为三元组,大致如下:
joined.map{ case (key, values) =>
val delta = computeDelta(values)
(key, values, delta)
}
或者,更简洁:
joined.map{ case (k, vs) => (k, vs, computeDelta(vs)) }
然后您的 computeDelta
函数可以只提取类型 (String, DateTime, Int,Int)
的第一个和第二个值,从每个值中获取第二个项目 (DateTime
) 并使用任何 DateTime
功能很方便。
如果你想让你的输出RDD仍然是成对的RDD,那么你就需要将新的delta字段包装成一个元组,大致如下:
joined.mapValues{ values =>
val delta = computeDelta(values)
(values, delta)
}
这将保留原始的 PairedRDD 键,并为您提供 (Iterable[(String, DateTime, Int,Int)], Long)
类型的值
(假设您正在计算 Long
类型的增量)
我有 2 个配对的 RDD,我使用相同的键将它们连接在一起,现在我想使用值部分的 2 列添加一个新的计算列。新加入的RDD类型为:
RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]
我想向新的 RDD 添加另一个字段,它显示 2 DateTime
字段之间的增量。
我该怎么做?
你应该可以使用map
将二元组扩展为三元组,大致如下:
joined.map{ case (key, values) =>
val delta = computeDelta(values)
(key, values, delta)
}
或者,更简洁:
joined.map{ case (k, vs) => (k, vs, computeDelta(vs)) }
然后您的 computeDelta
函数可以只提取类型 (String, DateTime, Int,Int)
的第一个和第二个值,从每个值中获取第二个项目 (DateTime
) 并使用任何 DateTime
功能很方便。
如果你想让你的输出RDD仍然是成对的RDD,那么你就需要将新的delta字段包装成一个元组,大致如下:
joined.mapValues{ values =>
val delta = computeDelta(values)
(values, delta)
}
这将保留原始的 PairedRDD 键,并为您提供 (Iterable[(String, DateTime, Int,Int)], Long)
(假设您正在计算 Long
类型的增量)