如何依赖hash map过滤RDD?
How to filter RDD relying on hash map?
我刚开始使用 spark 和 scala,但我必须解决以下问题:
我有一个包含行的 ORC 文件,我必须根据来自哈希映射的特定条件检查这些行。
我用这种方式构建了包含 120,000 个条目的哈希映射(文件名、时间戳)(getTimestamp returns 一个 Option[Long]
类型):
val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
(itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap
并像这样检索具有 600 万个条目的 RDD:
val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd
并进行检查:
val sessionEventsToReport = sessionEventsRDD.filter(se => {
val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
这是正确且高效的方法吗?有推荐的缓存吗?
地图 fileNameTimestamp
会被洗牌到处理 parititons 的集群吗?
fileNameTimestamp 将为每个任务序列化,并且有 120,000 个条目,它可能非常昂贵。您应该广播大对象并引用广播变量:
val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)
现在这些物品中只有一个会被运送给每个工人。也不需要下拉到 RDD API,因为数据集 API 有一个过滤方法:
val sessionEvents = sessionDataDF.as[SessionEvent]
val sessionEventsToReport = sessionEvents.filter(se => {
val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
您 collect
编辑的 fileNameTimestamp
映射存在于 Spark 主节点上。为了在查询中像这样有效地引用,工作节点需要访问它。这是由 broadcasting 完成的。
本质上,您重新发现了 Broadcast Hash Join:您需要将 sessionEventsRDD 与 tgzFilesRDD 结合起来以获取对可选时间戳的访问权限,然后进行相应的过滤。
使用 RDD 时,您需要显式编写连接策略代码。 Dataframes/Datasets API 有一个查询优化器可以为您做出选择。您还可以明确要求 API 在幕后使用上述广播连接技术。您可以找到这两种方法的示例 here.
让我知道这是否足够清楚:)
我刚开始使用 spark 和 scala,但我必须解决以下问题: 我有一个包含行的 ORC 文件,我必须根据来自哈希映射的特定条件检查这些行。
我用这种方式构建了包含 120,000 个条目的哈希映射(文件名、时间戳)(getTimestamp returns 一个 Option[Long]
类型):
val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
(itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap
并像这样检索具有 600 万个条目的 RDD:
val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd
并进行检查:
val sessionEventsToReport = sessionEventsRDD.filter(se => {
val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
这是正确且高效的方法吗?有推荐的缓存吗?
地图 fileNameTimestamp
会被洗牌到处理 parititons 的集群吗?
fileNameTimestamp 将为每个任务序列化,并且有 120,000 个条目,它可能非常昂贵。您应该广播大对象并引用广播变量:
val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)
现在这些物品中只有一个会被运送给每个工人。也不需要下拉到 RDD API,因为数据集 API 有一个过滤方法:
val sessionEvents = sessionDataDF.as[SessionEvent]
val sessionEventsToReport = sessionEvents.filter(se => {
val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
您 collect
编辑的 fileNameTimestamp
映射存在于 Spark 主节点上。为了在查询中像这样有效地引用,工作节点需要访问它。这是由 broadcasting 完成的。
本质上,您重新发现了 Broadcast Hash Join:您需要将 sessionEventsRDD 与 tgzFilesRDD 结合起来以获取对可选时间戳的访问权限,然后进行相应的过滤。
使用 RDD 时,您需要显式编写连接策略代码。 Dataframes/Datasets API 有一个查询优化器可以为您做出选择。您还可以明确要求 API 在幕后使用上述广播连接技术。您可以找到这两种方法的示例 here.
让我知道这是否足够清楚:)