了解火花过程行为
Understanding spark process behaviour
我想了解一个进程行为。基本上这个 spark 过程必须创建最多五个文件,每个区域一个,并将它们保存到 HDFS 中。
领土由五个字符串组成的数组提供。但是当我查看 spark UI 时,我看到多次执行相同的操作。
这些是我的问题:
- 为什么 isEmpty 操作对每个区域执行了 4 次而不是一次?我希望对领土采取一次行动。
- 计算isEmpty时任务个数是如何决定的?第一次只有一个任务,第二次任务是 4,第三次是 20,第四次是 35。这个大小背后的逻辑是什么?我可以通过某种方式控制该数字吗?
注意:是否有人有更好的大数据解决方案来实现相同的流程目标,请建议我。
这是 Spark 进程的代码摘录:
class IntegrationStatusD1RequestProcess {
logger.info(s"Retrieving all measurement point from DB")
val allMPoints = registryData.createIncrementalRegistryByMPointID()
.setName("allMPoints")
.persist(StorageLevel.MEMORY_AND_DISK)
logger.info("getTerritories return always an array of five String")
intStatusHelper.getTerritories.foreach { territory =>
logger.info(s"Retrieving measurement point for territory $territory")
val intStatusesChanged = allMPoints
.filter { m => m.getmPoint.substring(0, 3) == territory }
.setName(s"intStatusesChanged_${territory}")
.persist(StorageLevel.MEMORY_AND_DISK)
intStatusesChanged.isEmpty match {
case true => logger.info(s"No changes detected for territory")
case false =>
//create file and save it into hdfs
}
}
}
这是显示所有 spark 作业的图像:
以下前两张图片显示了 isEmpty 阶段:
如果您期望 isEmpty 为真,则效率低下!
这是 isEmpty
的 RDD 代码:
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
它调用take
。如果你认为 RDD 不是 是空的,这是一个有效的实现,但如果你认为它 是 ,这是一个可怕的实现。
take
的实现遵循这个递归步骤,从 parts = 1 开始:
- 收集第一个 部分 个分区。
- 检查此结果是否包含 >=
n
项。
- 如果是,取第一个
n
- 如果否,请用
parts = parts * 4
重复步骤 1。
如果 RDD 的元素多于您想要的元素,此实现策略会让执行短路 take
,这通常是正确的。但是,如果您的 RDD 的元素少于您想要的 take
,您最终会计算分区 #1 log4(nPartitions) + 1
次,分区 #2-4 log4(nPartitions)
次,分区 #5-16 log4(nPartitions) - 1
次,以此类推。
此用例的更好实现
此实现通过牺牲短路能力仅计算每个分区一次:
def fasterIsEmpty(rdd: RDD[_]): Boolean = {
rdd.mapPartitions(it => Iterator(it.isEmpty))
.fold(true)(_ && _)
}
我想了解一个进程行为。基本上这个 spark 过程必须创建最多五个文件,每个区域一个,并将它们保存到 HDFS 中。
领土由五个字符串组成的数组提供。但是当我查看 spark UI 时,我看到多次执行相同的操作。
这些是我的问题:
- 为什么 isEmpty 操作对每个区域执行了 4 次而不是一次?我希望对领土采取一次行动。
- 计算isEmpty时任务个数是如何决定的?第一次只有一个任务,第二次任务是 4,第三次是 20,第四次是 35。这个大小背后的逻辑是什么?我可以通过某种方式控制该数字吗?
注意:是否有人有更好的大数据解决方案来实现相同的流程目标,请建议我。
这是 Spark 进程的代码摘录:
class IntegrationStatusD1RequestProcess {
logger.info(s"Retrieving all measurement point from DB")
val allMPoints = registryData.createIncrementalRegistryByMPointID()
.setName("allMPoints")
.persist(StorageLevel.MEMORY_AND_DISK)
logger.info("getTerritories return always an array of five String")
intStatusHelper.getTerritories.foreach { territory =>
logger.info(s"Retrieving measurement point for territory $territory")
val intStatusesChanged = allMPoints
.filter { m => m.getmPoint.substring(0, 3) == territory }
.setName(s"intStatusesChanged_${territory}")
.persist(StorageLevel.MEMORY_AND_DISK)
intStatusesChanged.isEmpty match {
case true => logger.info(s"No changes detected for territory")
case false =>
//create file and save it into hdfs
}
}
}
这是显示所有 spark 作业的图像:
以下前两张图片显示了 isEmpty 阶段:
如果您期望 isEmpty 为真,则效率低下!
这是 isEmpty
的 RDD 代码:
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
它调用take
。如果你认为 RDD 不是 是空的,这是一个有效的实现,但如果你认为它 是 ,这是一个可怕的实现。
take
的实现遵循这个递归步骤,从 parts = 1 开始:
- 收集第一个 部分 个分区。
- 检查此结果是否包含 >=
n
项。 - 如果是,取第一个
n
- 如果否,请用
parts = parts * 4
重复步骤 1。
如果 RDD 的元素多于您想要的元素,此实现策略会让执行短路 take
,这通常是正确的。但是,如果您的 RDD 的元素少于您想要的 take
,您最终会计算分区 #1 log4(nPartitions) + 1
次,分区 #2-4 log4(nPartitions)
次,分区 #5-16 log4(nPartitions) - 1
次,以此类推。
此用例的更好实现
此实现通过牺牲短路能力仅计算每个分区一次:
def fasterIsEmpty(rdd: RDD[_]): Boolean = {
rdd.mapPartitions(it => Iterator(it.isEmpty))
.fold(true)(_ && _)
}