如何依赖hash map过滤RDD?

How to filter RDD relying on hash map?

我刚开始使用 spark 和 scala,但我必须解决以下问题: 我有一个包含行的 ORC 文件,我必须根据来自哈希映射的特定条件检查这些行。

我用这种方式构建了包含 120,000 个条目的哈希映射(文件名、时间戳)(getTimestamp returns 一个 Option[Long] 类型):

val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
    (itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap

并像这样检索具有 600 万个条目的 RDD:

val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd

并进行检查:

val sessionEventsToReport = sessionEventsRDD.filter(se => {
    val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
    se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})

这是正确且高效的方法吗?有推荐的缓存吗? 地图 fileNameTimestamp 会被洗牌到处理 parititons 的集群吗?

fileNameTimestamp 将为每个任务序列化,并且有 120,000 个条目,它可能非常昂贵。您应该广播大对象并引用广播变量:

val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)

现在这些物品中只有一个会被运送给每个工人。也不需要下拉到 RDD API,因为数据集 API 有一个过滤方法:

val sessionEvents = sessionDataDF.as[SessionEvent]
val sessionEventsToReport = sessionEvents.filter(se => {
    val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
    se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})

collect 编辑的 fileNameTimestamp 映射存在于 Spark 主节点上。为了在查询中像这样有效地引用,工作节点需要访问它。这是由 broadcasting 完成的。

本质上,您重新发现了 Broadcast Hash Join:您需要将 sessionEventsRDD 与 tgzFilesRDD 结合起来以获取对可选时间戳的访问权限,然后进行相应的过滤。

使用 RDD 时,您需要显式编写连接策略代码。 Dataframes/Datasets API 有一个查询优化器可以为您做出选择。您还可以明确要求 API 在幕后使用上述广播连接技术。您可以找到这两种方法的示例 here.

让我知道这是否足够清楚:)