如何使用累加器统计leftOuterJoin中没有匹配项的记录?
How to use accumulator to count the records that have no matching items in leftOuterJoin?
Spark accumulators
是获取有关 RDD 操作的有用信息的好方法。
我的问题如下:我想在两个数据集之间执行连接,例如调用events
和 items
(其中事件是唯一的并且涉及项目,并且都由 item_id
键控,这是 items
的主要事件)
有效的是:
val joinedRDD = events.leftOuterJoin(items)
了解有多少事件没有匹配项的一种可能方法是写:
val numMissingItems = joinedRDD.map(x => if (x._2._2.isDefined) 0 else 1).sum
我的问题是:有没有办法用累加器获得这个计数?我不想通过 RDD 只是为了计数。
确实,您可以使用 cogroup
签名,然后自己执行 leftOuterJoin
执行的逻辑,并在不匹配的情况下增加累加器。然而,重要的是要注意,因为这是一个转换,所以有可能(例如,如果任务失败/重新计算)您的累加器 可能 过度计算记录数,尽管一般不会很多。这取决于你是否可以接受。
根据@Holden 的回答回答@Francis Toth 的要求:
这是基于spark的leftOuterJoin,唯一增加的是missingRightRecordsAcc += 1
部分。
函数定义:
object JoinerWithAccumulation {
def leftOuterJoinWithAccumulator[K: ClassTag, V, W](left: PairRDDFunctions[K, V],
right: RDD[(K, W)],
missingRightRecordsAcc: Accumulator[Int])
: RDD[(K, (V, Option[W]))] = {
left.cogroup(right).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => { missingRightRecordsAcc += 1; (v, None)})
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
}
用法:
val events = sc.textFile("...").parse...keyBy(_.getItemId)
val items = sc.textFile("...").parse...keyBy(_.getId)
val acc = sc.accumulator(0)
val joined = JoinerWithAccumulation.leftOuterJoinWithAccumulator(eventsKV,adsKV,acc)
println(acc.value) // 0, since there were no actions performed on the rdd 'joined'
println(joined.count) // = events.count ; this triggers an action
println(acc.value) // = number of records in joined without a matching record from 'items'
(最难的部分是正确定义函数,使用 ClassTag
等。)
Spark accumulators
是获取有关 RDD 操作的有用信息的好方法。
我的问题如下:我想在两个数据集之间执行连接,例如调用events
和 items
(其中事件是唯一的并且涉及项目,并且都由 item_id
键控,这是 items
的主要事件)
有效的是:
val joinedRDD = events.leftOuterJoin(items)
了解有多少事件没有匹配项的一种可能方法是写:
val numMissingItems = joinedRDD.map(x => if (x._2._2.isDefined) 0 else 1).sum
我的问题是:有没有办法用累加器获得这个计数?我不想通过 RDD 只是为了计数。
确实,您可以使用 cogroup
签名,然后自己执行 leftOuterJoin
执行的逻辑,并在不匹配的情况下增加累加器。然而,重要的是要注意,因为这是一个转换,所以有可能(例如,如果任务失败/重新计算)您的累加器 可能 过度计算记录数,尽管一般不会很多。这取决于你是否可以接受。
根据@Holden 的回答回答@Francis Toth 的要求:
这是基于spark的leftOuterJoin,唯一增加的是missingRightRecordsAcc += 1
部分。
函数定义:
object JoinerWithAccumulation {
def leftOuterJoinWithAccumulator[K: ClassTag, V, W](left: PairRDDFunctions[K, V],
right: RDD[(K, W)],
missingRightRecordsAcc: Accumulator[Int])
: RDD[(K, (V, Option[W]))] = {
left.cogroup(right).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => { missingRightRecordsAcc += 1; (v, None)})
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
}
用法:
val events = sc.textFile("...").parse...keyBy(_.getItemId)
val items = sc.textFile("...").parse...keyBy(_.getId)
val acc = sc.accumulator(0)
val joined = JoinerWithAccumulation.leftOuterJoinWithAccumulator(eventsKV,adsKV,acc)
println(acc.value) // 0, since there were no actions performed on the rdd 'joined'
println(joined.count) // = events.count ; this triggers an action
println(acc.value) // = number of records in joined without a matching record from 'items'
(最难的部分是正确定义函数,使用 ClassTag
等。)