优化 Spark Scala 作业 - 许多任务,嵌套映射需要数小时,XML 解析
Optimize Spark Scala Job - Many Tasks, Nested mapping taking hours, XML Parse
我正在 运行在 spark-shell 中执行一个 spark 作业,它已经执行了将近 80 多个小时,必须有一些方法来分散它。下面是我在启动作业时提交的配置和正在运行的代码。
spark-shell --master \
yarn \
--num-executors 100 \
--name cde_test \
--executor-cores 4 \
--executor-memory 5g \
--driver-cores 2 \
--driver-memory 3g \
--jars ./spark_jars/spark-xml_2.11-0.8.0.jar \
--verbose
这是资源管理器 UI 工具上执行程序信息的图片:
spark_ui_executor_screenshot
我想用 spark-xml 解析 XML 个文件并提取某些字段并保存为 CSV。我认为增加执行者的数量会加快工作速度,因为这些都是小而快速的低内存任务,但不确定我是否做对了,或者代码的编写方式是否会阻止并行执行。下面的代码,以及所有帮助表示赞赏。
import org.apache.hadoop.fs._
import collection.mutable._
import spark.implicits._
import java.io.File
import java.util.regex.Pattern
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.util.control.Exception._
import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang.StringEscapeUtils
import org.apache.hadoop.conf.Configuration
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
object HdfsUtils {
def pathExists(path: String, sc: SparkContext): Boolean = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
fs.exists(new Path(path))
}
def getFullPath(path:String, sc: SparkContext): String = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
fs.getFileStatus(new Path(path)).getPath().toString
}
def getAllFiles(path:String, sc: SparkContext): Seq[String] = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
val files = fs.listStatus(new Path(path))
files.map(_.getPath().toString)
}
}
//Four different mapping functions
val path_list = Seq("path_1_for_first_directory",
"path_2_for_second_directory")
path_list.foreach ( path => {
val hdfs_directory = HdfsUtils.getAllFiles(path, sc)
hdfs_directory.foreach( intermediate_folder => {
val intermediate_folders = HdfsUtils.getAllFiles(intermediate_folder, sc)
intermediate_folders.foreach( final_folder => {
val hdfs_files = HdfsUtils.getAllFiles(final_folder, sc)
hdfs_files.foreach( xml_file => {
val date = raw"(\d{4})-(\d{2})-(\d{2})".r
val directory_date = date.findFirstIn(xml_file).
getOrElse(xml_file)
//Ignore meta files
if (xml_file.contains("META") || xml_file.contains("meta")){
} else if (xml_file.contains(".xml") || xml_file.contains(".XML")){
try{
val xml_df = spark.
read.
format("xml").
option("rowTag","root").
option("treatEmptyValuesAsNulls","true").
option("nullValue", null).
option("emptyValue", null).
load(xml_file)
val info_df = xml_df.
select(
substring($"column_1",0,8).alias("date"),
substring($"column_2",9,20).alias("time"),
$"column_3".alias("first_name").cast("string"),
$"column_4".alias("last_name").cast("string"),
$"column_5".alias("birthday").cast("string"),
$"column_6".alias("street").cast("string"),
$"column_7".alias("city").cast("string"),
$"column_8".alias("state").cast("string"),
$"column_9".alias("zip_code").cast("string"),
$"column_10".alias("country").cast("string")
)
val outputfile = "/path_to_output/"
var filename = s"$directory_date"
var outputFileName = outputfile + filename
info_df.write
.format("csv")
.option("header", "false")
.option("sep","|")
.mode("append")
.save(outputFileName)
}
catch{
case _: RuntimeException => {}
case _: Exception => {}
}
}
})
})
})
})
您在 Seq 上使用 foreach
,它是连续的(正如 Hristo Iliev 提到的)。如果您有很多文件,而且大部分文件都比较小,那么一次处理一个文件可能就是速度慢的原因。
- 您可以使用通配符而不是遍历 HDFS 文件。您可以一次将多个文件读取到一个更大的 DataFrame 中;例如,在这里,我们一次处理整整一个月:
spark.read.format("xml").load("/somepath/*/YYYY-MM-*.xml")
注意 /*/
代表 "intermediate directory"。什么对你更好可能取决于这些中间目录是否有更具体的模式,或者它们是否也取决于日期。
- 如果你还想按日期组织多个输出文件,使用
partitionBy
和info_df
,这样更大的DataFrame可以根据一个键输出到多个日期directories/files。这是一个示例:Write to multiple outputs by key Spark - one Spark job
我正在 运行在 spark-shell 中执行一个 spark 作业,它已经执行了将近 80 多个小时,必须有一些方法来分散它。下面是我在启动作业时提交的配置和正在运行的代码。
spark-shell --master \
yarn \
--num-executors 100 \
--name cde_test \
--executor-cores 4 \
--executor-memory 5g \
--driver-cores 2 \
--driver-memory 3g \
--jars ./spark_jars/spark-xml_2.11-0.8.0.jar \
--verbose
这是资源管理器 UI 工具上执行程序信息的图片: spark_ui_executor_screenshot
我想用 spark-xml 解析 XML 个文件并提取某些字段并保存为 CSV。我认为增加执行者的数量会加快工作速度,因为这些都是小而快速的低内存任务,但不确定我是否做对了,或者代码的编写方式是否会阻止并行执行。下面的代码,以及所有帮助表示赞赏。
import org.apache.hadoop.fs._
import collection.mutable._
import spark.implicits._
import java.io.File
import java.util.regex.Pattern
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.util.control.Exception._
import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang.StringEscapeUtils
import org.apache.hadoop.conf.Configuration
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
object HdfsUtils {
def pathExists(path: String, sc: SparkContext): Boolean = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
fs.exists(new Path(path))
}
def getFullPath(path:String, sc: SparkContext): String = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
fs.getFileStatus(new Path(path)).getPath().toString
}
def getAllFiles(path:String, sc: SparkContext): Seq[String] = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
val files = fs.listStatus(new Path(path))
files.map(_.getPath().toString)
}
}
//Four different mapping functions
val path_list = Seq("path_1_for_first_directory",
"path_2_for_second_directory")
path_list.foreach ( path => {
val hdfs_directory = HdfsUtils.getAllFiles(path, sc)
hdfs_directory.foreach( intermediate_folder => {
val intermediate_folders = HdfsUtils.getAllFiles(intermediate_folder, sc)
intermediate_folders.foreach( final_folder => {
val hdfs_files = HdfsUtils.getAllFiles(final_folder, sc)
hdfs_files.foreach( xml_file => {
val date = raw"(\d{4})-(\d{2})-(\d{2})".r
val directory_date = date.findFirstIn(xml_file).
getOrElse(xml_file)
//Ignore meta files
if (xml_file.contains("META") || xml_file.contains("meta")){
} else if (xml_file.contains(".xml") || xml_file.contains(".XML")){
try{
val xml_df = spark.
read.
format("xml").
option("rowTag","root").
option("treatEmptyValuesAsNulls","true").
option("nullValue", null).
option("emptyValue", null).
load(xml_file)
val info_df = xml_df.
select(
substring($"column_1",0,8).alias("date"),
substring($"column_2",9,20).alias("time"),
$"column_3".alias("first_name").cast("string"),
$"column_4".alias("last_name").cast("string"),
$"column_5".alias("birthday").cast("string"),
$"column_6".alias("street").cast("string"),
$"column_7".alias("city").cast("string"),
$"column_8".alias("state").cast("string"),
$"column_9".alias("zip_code").cast("string"),
$"column_10".alias("country").cast("string")
)
val outputfile = "/path_to_output/"
var filename = s"$directory_date"
var outputFileName = outputfile + filename
info_df.write
.format("csv")
.option("header", "false")
.option("sep","|")
.mode("append")
.save(outputFileName)
}
catch{
case _: RuntimeException => {}
case _: Exception => {}
}
}
})
})
})
})
您在 Seq 上使用 foreach
,它是连续的(正如 Hristo Iliev 提到的)。如果您有很多文件,而且大部分文件都比较小,那么一次处理一个文件可能就是速度慢的原因。
- 您可以使用通配符而不是遍历 HDFS 文件。您可以一次将多个文件读取到一个更大的 DataFrame 中;例如,在这里,我们一次处理整整一个月:
spark.read.format("xml").load("/somepath/*/YYYY-MM-*.xml")
注意 /*/
代表 "intermediate directory"。什么对你更好可能取决于这些中间目录是否有更具体的模式,或者它们是否也取决于日期。
- 如果你还想按日期组织多个输出文件,使用
partitionBy
和info_df
,这样更大的DataFrame可以根据一个键输出到多个日期directories/files。这是一个示例:Write to multiple outputs by key Spark - one Spark job