在 standalone/master-slave spark-shell 中从 Parquet 读取时的不同行为
Different behaviour when reading from Parquet in standalone/master-slave spark-shell
这是我用来在 Scala 中从 Parquet 读取数据帧的较大代码的片段。
case class COOMatrix(row: Seq[Long], col: Seq[Long], data: Seq[Double])
def buildMatrix(cooMatrixFields: DataFrame) = {
val cooMatrices = cooMatrixFields map {
case Row(r,c,d) => COOMatrix(r.asInstanceOf[Seq[Long]], c.asInstanceOf[Seq[Long]], d.asInstanceOf[Seq[Double]])
}
val matEntries = cooMatrices.zipWithIndex.flatMap {
case (cooMat, matIndex) =>
val rowOffset = cooMat.row.distinct.size
val colOffset = cooMat.col.distinct.size
val cooMatRowShifted = cooMat.row.map(rowEntry => rowEntry + rowOffset * matIndex)
val cooMatColShifted = cooMat.col.map(colEntry => colEntry + colOffset * matIndex)
(cooMatRowShifted, cooMatColShifted, cooMat.data).zipped.map {
case (i, j, value) => MatrixEntry(i, j, value)
}
}
new CoordinateMatrix(matEntries)
}
val C_entries = sqlContext.read.load(s"${dataBaseDir}/C.parquet")
val C = buildMatrix(C_entries)
我的代码在本地 spark 上下文中运行 时成功执行。
在 独立集群 上,同样的代码一旦到达强制它实际从 Parquet 读取的操作就会失败。
正确检索数据框的架构:
C_entries: org.apache.spark.sql.DataFrame = [C_row: array<bigint>, C_col: array<bigint>, C_data: array<double>]
但是执行程序在执行这一行时崩溃 val C = buildMatrix(C_entries)
,除了这个例外:
java.lang.ExceptionInInitializerError
at $line39.$read$$iwC.<init>(<console>:7)
at $line39.$read.<init>(<console>:61)
at $line39.$read$.<init>(<console>:65)
at $line39.$read$.<clinit>(<console>)
at $line67.$read$$iwC.<init>(<console>:7)
at $line67.$read.<init>(<console>:24)
at $line67.$read$.<init>(<console>:28)
at $line67.$read$.<clinit>(<console>)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:63)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:62)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at $line4.$read$$iwC$$iwC.<init>(<console>:15)
at $line4.$read$$iwC.<init>(<console>:24)
at $line4.$read.<init>(<console>:26)
at $line4.$read$.<init>(<console>:30)
at $line4.$read$.<clinit>(<console>)
... 22 more
不确定是否相关,但在增加日志详细程度的同时,我注意到了这个异常:
16/03/07 20:59:38 INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
16/03/07 20:59:38 DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class
我为独立集群尝试了不同的配置:
- 主机、1 个从机和 spark-shell 运行 在我的笔记本电脑上
- master 和 1 slave 每个 运行 在不同的机器上,spark-shell 在我的笔记本电脑上
- 一台机器上的master和spark-shell,另一台机器上的1个slave
我从默认属性开始,然后演变为更复杂的属性文件,但没有取得更多成功:
spark.driver.memory 4g
spark.rpc=netty
spark.eventLog.enabled true
spark.eventLog.dir file:///mnt/fastmp/spark_workdir/logs
spark.driver.extraJavaOptions -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m
spark.shuffle.service.enabled true
spark.shuffle.consolidateFiles true
spark.sql.parquet.binaryAsString true
spark.speculation false
spark.rpc.timeout 1000
spark.rdd.compress true
spark.core.connection.ack.wait.timeout 600
spark.driver.maxResultSize 0
spark.task.maxFailures 3
spark.shuffle.io.maxRetries 3
我是 运行 spark-1.6.0-bin-hadoop2.6 的预构建版本。
此部署中不涉及 HDFS,所有 Parquet 文件都存储在所有机器都可用的共享装载 (CephFS) 上。
我怀疑这与底层文件系统有关,因为我的代码的另一部分在本地和独立模式下都能正常读取不同的 Parquet 文件。
TL;DR:将您的代码打包为 jar
出于记录目的,问题似乎与使用独立集群有关。
完全相同的代码适用于这些设置:
- spark-shell和master在同一台机器上
- 运行在 YARN(AWS EMR 集群)上运行并从 S3 读取 parquet 文件
在独立设置的日志中进一步挖掘,问题似乎与 class 服务器的异常有关:
INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class
我的理解是 spark-shell
启动一个 HTTP 服务器 (jetty) 以服务它从代码中生成的 classes REPL 给工人。
在我的例子中,很多 classes 都被成功提供(我什至设法通过 telnet 检索了一些)。但是 class GeneratedClass
(及其所有内部 classes)无法被 class 服务器找到。
日志中出现的典型错误信息是:
DEBUG Server: RESPONSE /org/apache/spark/sql/catalyst/expressions/GeneratedClass.class 404 handled=true
我的想法是,它与 master 和 spark-shell 在同一台服务器上工作,因为它们 运行 在同一 JVM 中,因此即使 HTTP 也可以找到 class传输失败。
到目前为止我找到的唯一成功的解决方案是 构建一个 jar 包 并使用 spark-shell
的 --jars
选项或将其作为spark-submit
.
的参数
这是我用来在 Scala 中从 Parquet 读取数据帧的较大代码的片段。
case class COOMatrix(row: Seq[Long], col: Seq[Long], data: Seq[Double])
def buildMatrix(cooMatrixFields: DataFrame) = {
val cooMatrices = cooMatrixFields map {
case Row(r,c,d) => COOMatrix(r.asInstanceOf[Seq[Long]], c.asInstanceOf[Seq[Long]], d.asInstanceOf[Seq[Double]])
}
val matEntries = cooMatrices.zipWithIndex.flatMap {
case (cooMat, matIndex) =>
val rowOffset = cooMat.row.distinct.size
val colOffset = cooMat.col.distinct.size
val cooMatRowShifted = cooMat.row.map(rowEntry => rowEntry + rowOffset * matIndex)
val cooMatColShifted = cooMat.col.map(colEntry => colEntry + colOffset * matIndex)
(cooMatRowShifted, cooMatColShifted, cooMat.data).zipped.map {
case (i, j, value) => MatrixEntry(i, j, value)
}
}
new CoordinateMatrix(matEntries)
}
val C_entries = sqlContext.read.load(s"${dataBaseDir}/C.parquet")
val C = buildMatrix(C_entries)
我的代码在本地 spark 上下文中运行 时成功执行。
在 独立集群 上,同样的代码一旦到达强制它实际从 Parquet 读取的操作就会失败。 正确检索数据框的架构:
C_entries: org.apache.spark.sql.DataFrame = [C_row: array<bigint>, C_col: array<bigint>, C_data: array<double>]
但是执行程序在执行这一行时崩溃 val C = buildMatrix(C_entries)
,除了这个例外:
java.lang.ExceptionInInitializerError
at $line39.$read$$iwC.<init>(<console>:7)
at $line39.$read.<init>(<console>:61)
at $line39.$read$.<init>(<console>:65)
at $line39.$read$.<clinit>(<console>)
at $line67.$read$$iwC.<init>(<console>:7)
at $line67.$read.<init>(<console>:24)
at $line67.$read$.<init>(<console>:28)
at $line67.$read$.<clinit>(<console>)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:63)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:62)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at $line4.$read$$iwC$$iwC.<init>(<console>:15)
at $line4.$read$$iwC.<init>(<console>:24)
at $line4.$read.<init>(<console>:26)
at $line4.$read$.<init>(<console>:30)
at $line4.$read$.<clinit>(<console>)
... 22 more
不确定是否相关,但在增加日志详细程度的同时,我注意到了这个异常:
16/03/07 20:59:38 INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
16/03/07 20:59:38 DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class
我为独立集群尝试了不同的配置:
- 主机、1 个从机和 spark-shell 运行 在我的笔记本电脑上
- master 和 1 slave 每个 运行 在不同的机器上,spark-shell 在我的笔记本电脑上
- 一台机器上的master和spark-shell,另一台机器上的1个slave
我从默认属性开始,然后演变为更复杂的属性文件,但没有取得更多成功:
spark.driver.memory 4g
spark.rpc=netty
spark.eventLog.enabled true
spark.eventLog.dir file:///mnt/fastmp/spark_workdir/logs
spark.driver.extraJavaOptions -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m
spark.shuffle.service.enabled true
spark.shuffle.consolidateFiles true
spark.sql.parquet.binaryAsString true
spark.speculation false
spark.rpc.timeout 1000
spark.rdd.compress true
spark.core.connection.ack.wait.timeout 600
spark.driver.maxResultSize 0
spark.task.maxFailures 3
spark.shuffle.io.maxRetries 3
我是 运行 spark-1.6.0-bin-hadoop2.6 的预构建版本。 此部署中不涉及 HDFS,所有 Parquet 文件都存储在所有机器都可用的共享装载 (CephFS) 上。
我怀疑这与底层文件系统有关,因为我的代码的另一部分在本地和独立模式下都能正常读取不同的 Parquet 文件。
TL;DR:将您的代码打包为 jar
出于记录目的,问题似乎与使用独立集群有关。
完全相同的代码适用于这些设置:
- spark-shell和master在同一台机器上
- 运行在 YARN(AWS EMR 集群)上运行并从 S3 读取 parquet 文件
在独立设置的日志中进一步挖掘,问题似乎与 class 服务器的异常有关:
INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class
我的理解是 spark-shell
启动一个 HTTP 服务器 (jetty) 以服务它从代码中生成的 classes REPL 给工人。
在我的例子中,很多 classes 都被成功提供(我什至设法通过 telnet 检索了一些)。但是 class GeneratedClass
(及其所有内部 classes)无法被 class 服务器找到。
日志中出现的典型错误信息是:
DEBUG Server: RESPONSE /org/apache/spark/sql/catalyst/expressions/GeneratedClass.class 404 handled=true
我的想法是,它与 master 和 spark-shell 在同一台服务器上工作,因为它们 运行 在同一 JVM 中,因此即使 HTTP 也可以找到 class传输失败。
到目前为止我找到的唯一成功的解决方案是 构建一个 jar 包 并使用 spark-shell
的 --jars
选项或将其作为spark-submit
.