在 Apache Spark Scala org.apache.spark.SparkException 中解析数据:尝试使用 textinputformat.record.delimiter 时出现任务不可序列化错误
Parsing Data in Apache Spark Scala org.apache.spark.SparkException: Task not serializable error when trying to use textinputformat.record.delimiter
输入文件:
___DATE___
2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06
___DATE___
2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05
所需输出:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
我正在尝试使用 Scala 接触 Spark。
尝试使用 Spark 2.3.1 和 scala 2.11.6 解析此输入文件。这是我的代码。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}
object parse_stats extends App {
case class LoadSchema(date:String)
val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")
val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}
transformRDD.collect().foreach(println)
}
如果 运行 来自 Intellij 的代码,我得到以下输出。
((),(),())
((),(),())
((),(),())
如果我从 spark-shell 运行,我得到以下错误:
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")
scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37
scala>
scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>
scala>
scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun, name: $outer, type: class $iw)
- object (class $anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
我错过了什么?
将 __DATA__
更改为管道“|”后,下面的代码片段产生所需的输出。请注意,我使用的是 Windows 平台,所以我要替换“\r”。请查看
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""\n"""); x.length > 5 } )
.map( line => { val x = line.split("""\n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
输出:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
更新1:
使用正则表达式匹配:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),\s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""\n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
我认为问题在于您在 RDD 之外定义了那些过滤器对象 (date_pattern
),因此 Spark 必须将整个 parse_stats
对象发送给所有执行程序,它不能这样做,因为它不能序列化整个对象。当你在本地模式下 运行 它不会发生这种情况,因为它不需要将任何对象发送给其他执行者。
在此处查看出色的答案:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
这个要点有一些快速简单的方法来避免序列化:https://gist.github.com/kmader/1d64e64621e63d566f67
输入文件:
___DATE___
2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06
___DATE___
2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05
所需输出:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
我正在尝试使用 Scala 接触 Spark。 尝试使用 Spark 2.3.1 和 scala 2.11.6 解析此输入文件。这是我的代码。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}
object parse_stats extends App {
case class LoadSchema(date:String)
val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")
val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}
transformRDD.collect().foreach(println)
}
如果 运行 来自 Intellij 的代码,我得到以下输出。
((),(),())
((),(),())
((),(),())
如果我从 spark-shell 运行,我得到以下错误:
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")
scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37
scala>
scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>
scala>
scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun, name: $outer, type: class $iw)
- object (class $anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
我错过了什么?
将 __DATA__
更改为管道“|”后,下面的代码片段产生所需的输出。请注意,我使用的是 Windows 平台,所以我要替换“\r”。请查看
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""\n"""); x.length > 5 } )
.map( line => { val x = line.split("""\n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
输出:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
更新1:
使用正则表达式匹配:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),\s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""\n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
我认为问题在于您在 RDD 之外定义了那些过滤器对象 (date_pattern
),因此 Spark 必须将整个 parse_stats
对象发送给所有执行程序,它不能这样做,因为它不能序列化整个对象。当你在本地模式下 运行 它不会发生这种情况,因为它不需要将任何对象发送给其他执行者。
在此处查看出色的答案:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
这个要点有一些快速简单的方法来避免序列化:https://gist.github.com/kmader/1d64e64621e63d566f67