Tensorflow Java 在 YARN 上使用 spark 占用过多内存

Tensorflow Java use too much memory with spark on YARN

当使用 tensorflow java 进行推理时,在 YARN 上进行作业 运行 的内存量异常大。这项工作 运行 在我的电脑(2 核 16Gb RAM)上与 spark 完美结合,需要 35 分钟才能完成。但是当我尝试在 YARN 上使用 10 个执行程序 16Gb 内存和 16 Gb memoryOverhead 运行 它时,执行程序因使用过多内存而被杀死。

使用 YARN 2.7.3 和 Spark 2.2.1 在 Hortonworks 集群上进行预测 运行。之前我们使用 DL4J 进行推理,一切都 运行 不到 3 分钟。 Tensor 在使用后正确关闭,我们使用 mapPartition 进行预测。每个任务包含大约 20.000 条记录 (1Mb),因此这将使输入张量为 2.000.000x14,输出张量为 2.000.000 (5Mb)。

在 YARN

上 运行ning 时将选项传递给 spark
--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2

如果我们设置 spark.sql.shuffle.partitions=2000,此配置可能会起作用,但需要 3 个小时

更新:

本地和集群之间的差异实际上是由于缺少过滤器。实际上,我们 运行 预测的数据比我们想象的要多。

要减少每个分区的内存占用,您必须在每个分区内创建批处理(使用 grouped(batchSize))。因此,您比 运行 对每一行的预测速度更快,并且您分配了预定大小 (batchSize) 的张量。如果您调查 code of tensorflowOnSpark scala 推理,这就是他们所做的。您将在下面找到一个重做的实现示例,此代码可能无法编译,但您知道如何编译。

    lazy val sess = SavedModelBundle.load(modelPath, "serve").session
    lazy val numberOfFeatures = 1
    lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
    lazy val numberOfOutputs = 1
    val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
        partition.grouped(batchSize).flatMap { batchPreprocessed =>
          val numberOfLines = batchPreprocessed.size
          val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)

          val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)

          for (
            featuresWithKey <- batchPreprocessed;
            feature <- featuresWithKey.features
          ) {
            featuresBuffer.put(feature)
          }
          featuresBuffer.flip()
          val featuresTensor = Tensor.create(featuresShape, featuresBuffer)

          val results: Tensor[_] = sess.runner
            .feed("cost", featuresTensor)
            .fetch("prediction")
            .run.get(0)

          val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
          val outputArray: Array[Array[Float]] = results.copyTo(output)

          results.close()
          featuresTensor.close()
          outputArray
        }
    }
    spark.createDataFrame(predictionsRDD)

我们按照this issue

中的建议使用 FloatBuffer 和 Shape 创建 Tensor