java.lang.StackOverflowError 在 spark-submit 中抛出,但在 IDE 中没有在 运行 中抛出
java.lang.StackOverflowError throw in spark-submit but not in running in IDE
我开发了一个用于协同过滤的 Spark 2.2 应用程序。它在 IntelliJ 中可以很好地用于 运行 或调试。我也可以进入Spark Web UI查看进程。但是当我尝试将它部署到 EMR 并在本地测试 spark-submit 时,程序不正确 运行。
spark提交命令的一部分:
spark-submit -v --master local[*] --deploy-mode client --executor-memory 4G --num-executors 10 --conf spark.executor.extraJavaOptions="-Xss200M " --conf spark.executor.memory="500M"
def finalStep(sc: SparkContext): Unit = {
val sameModel = MatrixFactorizationModel.load(sc, "CollaborativeFilter")
val globalInterestStats = mutable.Map[
Int, (DescriptiveStatistics, mutable.MutableList[Rating])
]()
val taxonsForUsers = sameModel.recommendProductsForUsers(200)
taxonsForUsers
.collect()
.flatMap(userToInterestArr => {
userToInterestArr._2.map(rating => {
if (globalInterestStats.get(rating.product).isEmpty) {
globalInterestStats(rating.product) = (
new DescriptiveStatistics(),
mutable.MutableList[Rating]()
)
}
globalInterestStats(rating.product)._1.addValue(rating.rating)
(rating, userToInterestArr._2)
})
})
.foreach(ratingToUserInterestArr => {
val rating = ratingToUserInterestArr._1
if (globalInterestStats.get(rating.product).isDefined) {
val interestStats = globalInterestStats(rating.product)
val userInterests = ratingToUserInterestArr._2
if (rating.rating >= interestStats._1.getPercentile(75)) {
userInterests.foreach(each => interestStats._2 += each)
}
}
})
println(globalInterestStats.toSeq.length) // ~300
val globalInterestRDD = sc.parallelize(globalInterestStats.toSeq, 100)// No. of partition does not matter
val nGlobalInterests = globalInterestStats.map(each => each._2._2.length).sum
// It was not working in spark-submit but I managed to convert this part of code to simplify code before creating the RDD
val taxonIDFMap = sc.parallelize(
globalInterestStats
.toSeq
.flatMap(each => {
each._2._2
.foldLeft(mutable.Map[Int, Double]())(op = (accu, value) => {
if (accu.get(value.product).isEmpty) {
accu(value.product) = 1
} else {
accu(value.product) += 1
}
accu
})
.toList
}), 100)
.reduceByKey((accu, value) => accu + value)
.map(each => {
val a: Double = Math.log10(nGlobalInterests / (1 + each._2)) / Math.log10(2)
(
each._1,
a
)
})
.collect()
.toMap
// Yet I have a way more complicated task need to operate on globalInterestRDD which I cannot simplify the size for Spark to handle
val result = globalInterestRDD
.count()
sc.stop()
println(result)
}
Exception in thread "dispatcher-event-loop-1" java.lang.WhosebugError
at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
...
我想这与以下方面高度相关:
http://asyncified.io/2016/12/10/mutablelist-and-the-short-path-to-a-Whosebugerror/
但我仍在努力理解和修复我的代码
问题是
val globalInterestStats = mutable.Map[
Int, (DescriptiveStatistics, mutable.MutableList[Rating])
]()
应该是
val globalInterestStats = mutable.Map[
Int, (DescriptiveStatistics, mutable.ArrayBuffer[Rating])
]()
尽管仍然不明白为什么 spark 应用程序在 IDE 中工作但在 spark-submit
中不工作
我开发了一个用于协同过滤的 Spark 2.2 应用程序。它在 IntelliJ 中可以很好地用于 运行 或调试。我也可以进入Spark Web UI查看进程。但是当我尝试将它部署到 EMR 并在本地测试 spark-submit 时,程序不正确 运行。
spark提交命令的一部分:
spark-submit -v --master local[*] --deploy-mode client --executor-memory 4G --num-executors 10 --conf spark.executor.extraJavaOptions="-Xss200M " --conf spark.executor.memory="500M"
def finalStep(sc: SparkContext): Unit = {
val sameModel = MatrixFactorizationModel.load(sc, "CollaborativeFilter")
val globalInterestStats = mutable.Map[
Int, (DescriptiveStatistics, mutable.MutableList[Rating])
]()
val taxonsForUsers = sameModel.recommendProductsForUsers(200)
taxonsForUsers
.collect()
.flatMap(userToInterestArr => {
userToInterestArr._2.map(rating => {
if (globalInterestStats.get(rating.product).isEmpty) {
globalInterestStats(rating.product) = (
new DescriptiveStatistics(),
mutable.MutableList[Rating]()
)
}
globalInterestStats(rating.product)._1.addValue(rating.rating)
(rating, userToInterestArr._2)
})
})
.foreach(ratingToUserInterestArr => {
val rating = ratingToUserInterestArr._1
if (globalInterestStats.get(rating.product).isDefined) {
val interestStats = globalInterestStats(rating.product)
val userInterests = ratingToUserInterestArr._2
if (rating.rating >= interestStats._1.getPercentile(75)) {
userInterests.foreach(each => interestStats._2 += each)
}
}
})
println(globalInterestStats.toSeq.length) // ~300
val globalInterestRDD = sc.parallelize(globalInterestStats.toSeq, 100)// No. of partition does not matter
val nGlobalInterests = globalInterestStats.map(each => each._2._2.length).sum
// It was not working in spark-submit but I managed to convert this part of code to simplify code before creating the RDD
val taxonIDFMap = sc.parallelize(
globalInterestStats
.toSeq
.flatMap(each => {
each._2._2
.foldLeft(mutable.Map[Int, Double]())(op = (accu, value) => {
if (accu.get(value.product).isEmpty) {
accu(value.product) = 1
} else {
accu(value.product) += 1
}
accu
})
.toList
}), 100)
.reduceByKey((accu, value) => accu + value)
.map(each => {
val a: Double = Math.log10(nGlobalInterests / (1 + each._2)) / Math.log10(2)
(
each._1,
a
)
})
.collect()
.toMap
// Yet I have a way more complicated task need to operate on globalInterestRDD which I cannot simplify the size for Spark to handle
val result = globalInterestRDD
.count()
sc.stop()
println(result)
}
Exception in thread "dispatcher-event-loop-1" java.lang.WhosebugError
at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
...
我想这与以下方面高度相关: http://asyncified.io/2016/12/10/mutablelist-and-the-short-path-to-a-Whosebugerror/
但我仍在努力理解和修复我的代码
问题是
val globalInterestStats = mutable.Map[
Int, (DescriptiveStatistics, mutable.MutableList[Rating])
]()
应该是
val globalInterestStats = mutable.Map[
Int, (DescriptiveStatistics, mutable.ArrayBuffer[Rating])
]()
尽管仍然不明白为什么 spark 应用程序在 IDE 中工作但在 spark-submit
中不工作