Tensorflow Java 在 YARN 上使用 spark 占用过多内存
Tensorflow Java use too much memory with spark on YARN
当使用 tensorflow java 进行推理时,在 YARN 上进行作业 运行 的内存量异常大。这项工作 运行 在我的电脑(2 核 16Gb RAM)上与 spark 完美结合,需要 35 分钟才能完成。但是当我尝试在 YARN 上使用 10 个执行程序 16Gb 内存和 16 Gb memoryOverhead 运行 它时,执行程序因使用过多内存而被杀死。
使用 YARN 2.7.3 和 Spark 2.2.1 在 Hortonworks 集群上进行预测 运行。之前我们使用 DL4J 进行推理,一切都 运行 不到 3 分钟。
Tensor 在使用后正确关闭,我们使用 mapPartition 进行预测。每个任务包含大约 20.000 条记录 (1Mb),因此这将使输入张量为 2.000.000x14,输出张量为 2.000.000 (5Mb)。
在 YARN
上 运行ning 时将选项传递给 spark
--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2
如果我们设置 spark.sql.shuffle.partitions=2000,此配置可能会起作用,但需要 3 个小时
更新:
本地和集群之间的差异实际上是由于缺少过滤器。实际上,我们 运行 预测的数据比我们想象的要多。
要减少每个分区的内存占用,您必须在每个分区内创建批处理(使用 grouped(batchSize)
)。因此,您比 运行 对每一行的预测速度更快,并且您分配了预定大小 (batchSize) 的张量。如果您调查 code of tensorflowOnSpark scala 推理,这就是他们所做的。您将在下面找到一个重做的实现示例,此代码可能无法编译,但您知道如何编译。
lazy val sess = SavedModelBundle.load(modelPath, "serve").session
lazy val numberOfFeatures = 1
lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
lazy val numberOfOutputs = 1
val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
partition.grouped(batchSize).flatMap { batchPreprocessed =>
val numberOfLines = batchPreprocessed.size
val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)
val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)
for (
featuresWithKey <- batchPreprocessed;
feature <- featuresWithKey.features
) {
featuresBuffer.put(feature)
}
featuresBuffer.flip()
val featuresTensor = Tensor.create(featuresShape, featuresBuffer)
val results: Tensor[_] = sess.runner
.feed("cost", featuresTensor)
.fetch("prediction")
.run.get(0)
val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
val outputArray: Array[Array[Float]] = results.copyTo(output)
results.close()
featuresTensor.close()
outputArray
}
}
spark.createDataFrame(predictionsRDD)
我们按照this issue
中的建议使用 FloatBuffer 和 Shape 创建 Tensor
当使用 tensorflow java 进行推理时,在 YARN 上进行作业 运行 的内存量异常大。这项工作 运行 在我的电脑(2 核 16Gb RAM)上与 spark 完美结合,需要 35 分钟才能完成。但是当我尝试在 YARN 上使用 10 个执行程序 16Gb 内存和 16 Gb memoryOverhead 运行 它时,执行程序因使用过多内存而被杀死。
使用 YARN 2.7.3 和 Spark 2.2.1 在 Hortonworks 集群上进行预测 运行。之前我们使用 DL4J 进行推理,一切都 运行 不到 3 分钟。 Tensor 在使用后正确关闭,我们使用 mapPartition 进行预测。每个任务包含大约 20.000 条记录 (1Mb),因此这将使输入张量为 2.000.000x14,输出张量为 2.000.000 (5Mb)。
在 YARN
上 运行ning 时将选项传递给 spark--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2
如果我们设置 spark.sql.shuffle.partitions=2000,此配置可能会起作用,但需要 3 个小时
更新:
本地和集群之间的差异实际上是由于缺少过滤器。实际上,我们 运行 预测的数据比我们想象的要多。
要减少每个分区的内存占用,您必须在每个分区内创建批处理(使用 grouped(batchSize)
)。因此,您比 运行 对每一行的预测速度更快,并且您分配了预定大小 (batchSize) 的张量。如果您调查 code of tensorflowOnSpark scala 推理,这就是他们所做的。您将在下面找到一个重做的实现示例,此代码可能无法编译,但您知道如何编译。
lazy val sess = SavedModelBundle.load(modelPath, "serve").session
lazy val numberOfFeatures = 1
lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
lazy val numberOfOutputs = 1
val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
partition.grouped(batchSize).flatMap { batchPreprocessed =>
val numberOfLines = batchPreprocessed.size
val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)
val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)
for (
featuresWithKey <- batchPreprocessed;
feature <- featuresWithKey.features
) {
featuresBuffer.put(feature)
}
featuresBuffer.flip()
val featuresTensor = Tensor.create(featuresShape, featuresBuffer)
val results: Tensor[_] = sess.runner
.feed("cost", featuresTensor)
.fetch("prediction")
.run.get(0)
val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
val outputArray: Array[Array[Float]] = results.copyTo(output)
results.close()
featuresTensor.close()
outputArray
}
}
spark.createDataFrame(predictionsRDD)
我们按照this issue
中的建议使用 FloatBuffer 和 Shape 创建 Tensor