尝试在 Apache Spark 中将 Dataframe 写入 CSV 时行为不一致
Inconsistent behaviour when attempting to write Dataframe to CSV in Apache Spark
我正在尝试使用 Dataframes 和 spark-csv 将我使用 Spark 的 MLlib 训练的决策树分类器的最佳超参数输出到 csv 文件。这是我的代码片段:
// Split the data into training and test sets (10% held out for testing)
val Array(trainingData, testData) = assembledData.randomSplit(Array(0.9, 0.1))
// Define cross validation with a hyperparameter grid
val crossval = new CrossValidator()
.setEstimator(classifier)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(new BinaryClassificationEvaluator)
.setNumFolds(10)
// Train model
val model = crossval.fit(trainingData)
// Find best hyperparameter combination and create an RDD
val bestModel = model.bestModel
val hyperparamList = new ListBuffer[(String, String)]()
bestModel.extractParamMap().toSeq.foreach(pair => {
val hyperparam: Tuple2[String,String] = (pair.param.name,pair.value.toString)
hyperparamList += hyperparam
})
val hyperparameters = sqlContext.sparkContext.parallelize(hyperparamList.toSeq)
// Print the best hyperparameters
println(bestModel.extractParamMap().toSeq.foreach(pair => {
println(s"${pair.param.parent} ${pair.param.name}")
println(pair.value)
}))
// Define csv path to output results
var csvPath: String = "/root/results/decision-tree"
val hyperparametersPath: String = csvPath+"/hyperparameters"
val hyperparametersFile: File = new File(hyperparametersPath)
val results = (hyperparameters, hyperparametersPath, hyperparametersFile)
// Convert RDD to Dataframe and write it as csv
val dfToSave = spark.createDataFrame(results._1.map(x => Row(x._1, x._2)))
dfToSave.write.format("csv").mode("overwrite").save(results._2)
// Stop spark session
spark.stop()
完成 Spark 作业后,我可以按预期在路径中看到 part-00*... 和 _SUCCESS 文件。然而,尽管在这种情况下总共有 13 个超参数(通过在屏幕上打印它们来确认),cat
-ing csv 文件显示并非每个超参数都被写入 csv:
user@master:~$ cat /root/results/decision-tree/hyperparameters/part*.csv
checkpointInterval,10
featuresCol,features
maxDepth,5
minInstancesPerNode,1
此外,在每次执行中写入的超参数都会发生变化。这是在基于 HDFS 的 Spark 集群上执行的,该集群具有 1 个主节点和 3 个具有完全相同硬件的工作节点。这可能是竞争条件吗?如果是,我该如何解决?
提前致谢。
我想我明白了。我希望 dfTosave.write.format("csv")save(path)
将所有内容写入主节点,但由于任务已分发给所有工作人员,因此每个工作人员都将其部分超参数保存到其文件系统中的本地 CSV 文件中。因为在我的例子中,主节点也是一个工作节点,所以我可以看到它的部分超参数。 "inconsistent behaviour"(即在每次执行中看到不同的部分)是由 Spark 用于在工作人员之间分配分区的任何算法引起的。
我的解决方案是使用 scp
或 rsync
之类的方法从所有工作人员那里收集 CSV,以构建完整的结果。
我正在尝试使用 Dataframes 和 spark-csv 将我使用 Spark 的 MLlib 训练的决策树分类器的最佳超参数输出到 csv 文件。这是我的代码片段:
// Split the data into training and test sets (10% held out for testing)
val Array(trainingData, testData) = assembledData.randomSplit(Array(0.9, 0.1))
// Define cross validation with a hyperparameter grid
val crossval = new CrossValidator()
.setEstimator(classifier)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(new BinaryClassificationEvaluator)
.setNumFolds(10)
// Train model
val model = crossval.fit(trainingData)
// Find best hyperparameter combination and create an RDD
val bestModel = model.bestModel
val hyperparamList = new ListBuffer[(String, String)]()
bestModel.extractParamMap().toSeq.foreach(pair => {
val hyperparam: Tuple2[String,String] = (pair.param.name,pair.value.toString)
hyperparamList += hyperparam
})
val hyperparameters = sqlContext.sparkContext.parallelize(hyperparamList.toSeq)
// Print the best hyperparameters
println(bestModel.extractParamMap().toSeq.foreach(pair => {
println(s"${pair.param.parent} ${pair.param.name}")
println(pair.value)
}))
// Define csv path to output results
var csvPath: String = "/root/results/decision-tree"
val hyperparametersPath: String = csvPath+"/hyperparameters"
val hyperparametersFile: File = new File(hyperparametersPath)
val results = (hyperparameters, hyperparametersPath, hyperparametersFile)
// Convert RDD to Dataframe and write it as csv
val dfToSave = spark.createDataFrame(results._1.map(x => Row(x._1, x._2)))
dfToSave.write.format("csv").mode("overwrite").save(results._2)
// Stop spark session
spark.stop()
完成 Spark 作业后,我可以按预期在路径中看到 part-00*... 和 _SUCCESS 文件。然而,尽管在这种情况下总共有 13 个超参数(通过在屏幕上打印它们来确认),cat
-ing csv 文件显示并非每个超参数都被写入 csv:
user@master:~$ cat /root/results/decision-tree/hyperparameters/part*.csv
checkpointInterval,10
featuresCol,features
maxDepth,5
minInstancesPerNode,1
此外,在每次执行中写入的超参数都会发生变化。这是在基于 HDFS 的 Spark 集群上执行的,该集群具有 1 个主节点和 3 个具有完全相同硬件的工作节点。这可能是竞争条件吗?如果是,我该如何解决?
提前致谢。
我想我明白了。我希望 dfTosave.write.format("csv")save(path)
将所有内容写入主节点,但由于任务已分发给所有工作人员,因此每个工作人员都将其部分超参数保存到其文件系统中的本地 CSV 文件中。因为在我的例子中,主节点也是一个工作节点,所以我可以看到它的部分超参数。 "inconsistent behaviour"(即在每次执行中看到不同的部分)是由 Spark 用于在工作人员之间分配分区的任何算法引起的。
我的解决方案是使用 scp
或 rsync
之类的方法从所有工作人员那里收集 CSV,以构建完整的结果。