如何使用累加器统计leftOuterJoin中没有匹配项的记录?

How to use accumulator to count the records that have no matching items in leftOuterJoin?

Spark accumulators 是获取有关 RDD 操作的有用信息的好方法。

我的问题如下:我想在两个数据集之间执行连接,例如调用eventsitems(其中事件是唯一的并且涉及项目,并且都由 item_id 键控,这是 items 的主要事件)

有效的是:

val joinedRDD = events.leftOuterJoin(items)

了解有多少事件没有匹配项的一种可能方法是写:

val numMissingItems = joinedRDD.map(x => if (x._2._2.isDefined) 0 else 1).sum

我的问题是:有没有办法用累加器获得这个计数?我不想通过 RDD 只是为了计数。

确实,您可以使用 cogroup 签名,然后自己执行 leftOuterJoin 执行的逻辑,并在不匹配的情况下增加累加器。然而,重要的是要注意,因为这是一个转换,所以有可能(例如,如果任务失败/重新计算)您的累加器 可能 过度计算记录数,尽管一般不会很多。这取决于你是否可以接受。

根据@Holden 的回答回答@Francis Toth 的要求:

这是基于spark的leftOuterJoin,唯一增加的是missingRightRecordsAcc += 1部分。

函数定义:

object JoinerWithAccumulation {
  def leftOuterJoinWithAccumulator[K: ClassTag, V, W](left: PairRDDFunctions[K, V],
                                                      right: RDD[(K, W)],
                                                      missingRightRecordsAcc: Accumulator[Int])
  : RDD[(K, (V, Option[W]))] = {
    left.cogroup(right).flatMapValues { pair =>
      if (pair._2.isEmpty) {
        pair._1.iterator.map(v => { missingRightRecordsAcc += 1; (v, None)})
      } else {
        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
      }
    }
  }
}

用法:

val events = sc.textFile("...").parse...keyBy(_.getItemId)
val items = sc.textFile("...").parse...keyBy(_.getId)
val acc = sc.accumulator(0)

val joined = JoinerWithAccumulation.leftOuterJoinWithAccumulator(eventsKV,adsKV,acc)

println(acc.value) // 0, since there were no actions performed on the rdd 'joined'

println(joined.count) // = events.count ; this triggers an action
println(acc.value) // = number of records in joined without a matching record from 'items'

(最难的部分是正确定义函数,使用 ClassTag 等。)