调整 Spark 作业

Tuning Spark Job

我正在尝试调整下面的过程,因为我有一个非常 Java heap space error.

查看 Spark UI,有一个 cogroup 的行为非常奇怪。 在那个阶段之前,一切似乎都非常平衡(目前我硬编码了分区数,48)。在方法 loadParentMPoint 中有 cogroup 转换,基本上当我要执行下一个计数时,计算 cogroup 并且基本上安排了 48 个任务,但其中 47 个立即终止(似乎什么都没有处理),除了一个开始做 洗牌读取 ,直到它填满堆 space 并引发异常。

我用相同的数据集启动了几次进程,结果总是一样的。 Everytime It works only one executors., while before is well balanced.

为什么我有这种行为? 也许我遗漏了什么?我之前尝试过 repartition 数据cogroup,因为我认为它是不平衡的,但它不起作用,当我尝试使用 partitionBy.

时也是如此

这是代码摘录:

    class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler {

    implicit val ctx = sc
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess])
    val ipc = new Handler[ConsumptionComputationBigDataIPC]
    val billingOrderDao = new Handler[BillingOrderDao]
    val mPointDao = new Handler[MeasurementPointDAO]
    val billingOrderBDao = new Handler[BillingOrderBDAO]
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO]
    val ccmService = new Handler[ConsumptionComputationBillingService]
    val registry = new Handler[IncrementalRegistryTableData]
    val podTimeZoneHelper = new Handler[PodDateTimeUtils]
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO]
    val config = new Handler[PropertyManager]
    val paramFacade = new Handler[ConsumptionParameterFacade]
    val consumptionMethods = new Handler[ConsumptionMethods]
    val partitions = config.get.defaultPartitions()
    val appName = sc.appName
    val appId = sc.applicationId
    val now = new DateTime

    val extracted = ctx.accumulator(0l, "Extracted from planning")
    val generated = ctx.accumulator(0l, "Billing orders generated")
    val discarded = ctx.accumulator(0l, "Billing orders discarded")

    // initialize staging
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea())
    staging.prepareReading

    val rddExtractedFromPlanning = staging
        .read[ExtractedPO]()
        .repartition(48)
        .setName("rddExtractedFromPlanning")
        .cache 

    val rddExtracted = rddExtractedFromPlanning
      .filter { x =>
        extracted += 1
        (x.getExtracted == EExtractedType.EXTRACTED ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_USER ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_TDC)
      }
      .map { x =>
        log.info("1:extracted>{}", x)
        val bo = MapperUtil.mapExtractedPOtoBO(x)
        bo
      }

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e =>
      val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate())
      val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr)
      log.info("2:last Billing order>{}", last);
      (e.getPod, e, last)
    }
      .setName("podWithExtractedAndLastBillingOrderPO")
      .cache()

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3))))

    val  rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO
      .map(e => (e._1,1))
      .reduceByKey(_+_)
      .keys

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List())

    val rddExtractedWithMPoint = ConsumptionComputationUtil
      .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory)
      .filter{ e =>
        val mPoint = e._3
        val condition = mPoint != null
        condition match {
          case false => log.error("MPoint is NULL for POD -> " + e._1)
          case true =>
        }
        condition
      }
      .setName("rddExtractedWithMPoint")
      .cache

    rddExtractedWithMPoint.count

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil
      .groupWithParent(rddExtractedWithMPoint)
      .map{
        case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) =>
          if (!parentMpointId.isEmpty) {
            val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get)
            log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory)
          } else {
            log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, null, factory)
          }
      }
        .setName("rddExtractedWithMPointWithParent")
        .cache()

    rddExtractedWithMPointWithParent.count

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent
      .filter(e => Option(e._5).isDefined)
      .map(e => (e._5,1))
      .reduceByKey(_+_)
      .keys

    rddRegistryFactoryParentKeys.count

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List())

    rddRegistryFactoryParent.count

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder]

    val rddNew = rddExtractedWithMPointWithParent.map({
      case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) =>
        (parentPod, (pod, extracted, measurementPoint, billingOrder, factory))
    })
    rddNew.count

    val p = rddNew.cogroup(rddRegistryFactoryParent)
    p.count

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty }
    .flatMap{ case (pod, (inputs, mpFactories)) =>
        val factory = mpFactories.headOption //eventually one or none factory
        val results = inputs.map{e =>
          val measurementPointTupla = factory.flatMap{f =>
            Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f)
         }
          val tupla = measurementPointTupla.getOrElse(null)
          val toBeBilled = if(tupla!=null && tupla._1!=null) false else true
          val m = if(tupla!=null && tupla._1!=null) tupla._1 else null
          val f = if(tupla!=null && tupla._2!=null) tupla._2 else null
          (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f)
        }
      results
    }
    .setName("rddExtractedWithMPointWithMpointParent")
    .cache()

    rddExtractedWithMPointWithMpointParent.foreach({ e =>
      log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1)
    })
}

这些是参与 cogroup 操作的两个 RDD 的阶段,rddNew:

rddRegistryFactory:

这是cogroup的舞台:

这是存储情况:

这是执行者选项卡:

N.B。我添加了计数操作只是为了调试目的。

更新:

  • 我坚信这 Java heap space error 是因为缓存的 rdds,根据你最后的屏幕截图,即存储选项卡,这似乎不是必需的。

根据访问数据集的次数和这样做涉及的工作量,重新计算可能比增加内存压力所付出的代价更快。

不言而喻,如果您只在缓存数据集没有意义时才读取它,它实际上会使您的工作变慢。

  • 为了进行调试计数,您可以使用 countApprox() 而不是 count。测试完成后,您可以将其删除以实际使用您的工作

  • 最重要的是通过打印每个分区的记录数来确保您的数据是统一的...如果需要,您可以重新分区和合并。

    可以像这样获取每个分区的记录数:

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

我解决了,问题是关于partitioning的。基本上,调用 cogroup 操作的 rdd 中的数据具有相同值的所有键,因此当发生 cogroup 时,Spark 尝试对两个 RDD 进行散列分区,将两个 rdd 的键放在同一个执行程序上,以便将它们组合在一起。