了解火花过程行为

Understanding spark process behaviour

我想了解一个进程行为。基本上这个 spark 过程必须创建最多五个文件,每个区域一个,并将它们保存到 HDFS 中。

领土由五个字符串组成的数组提供。但是当我查看 spark UI 时,我看到多次执行相同的操作。

这些是我的问题:

注意:是否有人有更好的大数据解决方案来实现相同的流程目标,请建议我。

这是 Spark 进程的代码摘录:

class IntegrationStatusD1RequestProcess {

  logger.info(s"Retrieving all measurement point from DB")
  val allMPoints = registryData.createIncrementalRegistryByMPointID()
    .setName("allMPoints")
    .persist(StorageLevel.MEMORY_AND_DISK)

  logger.info("getTerritories return always an array of five String")
  intStatusHelper.getTerritories.foreach { territory =>

    logger.info(s"Retrieving measurement point for territory $territory")
    val intStatusesChanged = allMPoints
      .filter { m => m.getmPoint.substring(0, 3) == territory }
      .setName(s"intStatusesChanged_${territory}")
      .persist(StorageLevel.MEMORY_AND_DISK)

    intStatusesChanged.isEmpty match {
      case true => logger.info(s"No changes detected for territory")
      case false =>

      //create file and save it into hdfs

    }
  }
}

这是显示所有 spark 作业的图像:

以下前两张图片显示了 isEmpty 阶段:

如果您期望 isEmpty 为真,则效率低下!

这是 isEmpty 的 RDD 代码:

def isEmpty(): Boolean = withScope {
  partitions.length == 0 || take(1).length == 0
}

它调用take。如果你认为 RDD 不是 是空的,这是一个有效的实现,但如果你认为它 ,这是一个可怕的实现。

take 的实现遵循这个递归步骤,从 parts = 1 开始:

  1. 收集第一个 部分 个分区。
  2. 检查此结果是否包含 >= n 项。
  3. 如果是,取第一个n
  4. 如果否,请用 parts = parts * 4 重复步骤 1。

如果 RDD 的元素多于您想要的元素,此实现策略会让执行短路 take,这通常是正确的。但是,如果您的 RDD 的元素少于您想要的 take,您最终会计算分区 #1 log4(nPartitions) + 1 次,分区 #2-4 log4(nPartitions) 次,分区 #5-16 log4(nPartitions) - 1次,以此类推。

此用例的更好实现

此实现通过牺牲短路能力仅计算每个分区一次:

def fasterIsEmpty(rdd: RDD[_]): Boolean = {
  rdd.mapPartitions(it => Iterator(it.isEmpty))
    .fold(true)(_ && _)
}