在 Pyspark 中循环导致 sparkException
Looping in Pyspark causes sparkException
在 Zeppelin 中使用 pyspark。
在我找到正确的做事方式之前(Last over a Window),我有一个循环将前一行的值逐一扩展到自身(我知道循环是不好的做法)。然而,在 运行ning 几百次之后,它在达到最佳情况条件 = 0 之前失败并出现 nullPointerException。
为了解决这个错误,(在我发现最后一个命令之前),我对中点条件 =1000 进行了循环 运行 几百次,转储结果。 运行 再次使用 condition=500,冲洗并重复直到达到 condition=0。
def extendTarget(myDF, loop, lessThan):
i = myDF.filter(col("target") == "unknown").count()
while (i > lessThan):
cc = loop
while (cc > 0):
myDF = myDF.withColumn("targetPrev", lag("target", 1).over(Window.partitionBy("id").orderBy("myTime")))
myDF = myDF.withColumn("targetNew", when(col("target") == "unknown", col("targetPrev")).otherwise(col("target")))
myDF = myDF.select(
"id",
"myTime",
col("targetNew").alias("target"))
cc = cc - 1
i = myDF.filter(col("target") == "unknown").count()
print i
return myDF
myData = spark.read.load(myPath)
myData = extendTarget(myData, 20, 0)
myData.write.parquet(myPathPart1)
我预计它会花费很长时间(因为我做错了),但不要指望它会出现异常
Output (given inputs (myData, 20, 0)
38160
22130
11375
6625
5085
4522
4216
3936
3662
3419
3202
Error
Py4JJavaError: An error occurred while calling o26814.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 1539.0 failed 4 times, most recent failure: Lost task 32.3 in stage 1539.0 (TID XXXX, ip-XXXX, executor 17): ExecutorLostFailure (executor 17 exited caused by one of the running tasks) Reason: Container from a bad node: container_XXXX_0001_01_000033 on host: ip-XXXX. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_XXXX_0001_01_000033
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
at org.apache.hadoop.util.Shell.run(Shell.java:869)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:235)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 50
.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2830)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2829)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
at sun.reflect.GeneratedMethodAccessor388.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling o26814.count.\n', JavaObject id=o26815), <traceback object at 0x7efc521b11b8>)
我只能猜测这与内存或缓存有关。即使我重用所有变量名。如果是内存问题,是否有垃圾收集或清除 cache/memory 命令,我可以将其放在打印命令旁边以使其永远循环?
我再次知道使用循环是一种不好的做法,特别是如果循环似乎永远持续下去,但有时 better\smarter 代码不会在我需要时出现,所以在此期间我会破解尽我所能。
在 Cronoik 的评论中回答
能否请您尝试以下操作(这将使执行计划保持较小):myDF = myDF.select("id", "myTime", col("targetNew")。 alias("target")).checkpoint()(将 via spark.setCheckpointDir 之前的对应行和之前的检查点目录替换掉)。 – cronoik 9 月 9 日 4:05
我测试了一下,它成功了!虽然前几次它失败了,主要是由于我对检查点如何工作的误解。你想把你的答案作为答案吗?我必须弄清楚的一些事情作为答案的一部分会很好 spark.sparkContext.setCheckpointDir("path/") - 设置 checkpointPath myDF.checkpoint() 不起作用,你必须重新分配该检查点myDF = myDF.checkpoint() 另外,我发现了空指针异常,如果我在计数之前将循环告诉 运行 200 次,它就会出现 nullpointerException 并且 sparkContext 结束。 (必须重新启动)– Ranald Fong 9 月 10 日 2:44
此外,在每个循环内设置检查点会导致它 运行 变慢,因为它可能在每个循环 运行 上设置检查点。真正加快速度的是将检查点放在打印之前,允许它在检查点之前在内存中循环 x amt 次(我使用了 100 次)。谢谢 cronoik,如果你把你的答案作为答案,我会把它标记为我的答案! – Ranald Fong 9 月 10 日 2:53
编辑:评论者想要详细信息
重点是将一个值从一行扩展到最后,替换所有未知数
| id | time | target |
| a | 1:00 | 1 |
| a | 1:01 | unknown|
| a | . | . |
| a | 5:00 | unknown|
| a | 5:01 | 2 |
至
| id | time | target |
| a | 1:00 | 1 |
| a | 1:01 | 1 |
| a | . | 1 |
| a | 5:00 | 1 |
| a | 5:01 | 2 |
代码已更改为使用检查点
spark.sparkContext.setCheckpointDir(".../myCheckpointsPath/")
def extendTarget(myDF, loop, lessThan):
i = myDF.filter(col("target") == "unknown").count()
while (i > lessThan):
cc = loop
while (cc > 0):
myDF = myDF.withColumn("targetPrev", lag("target", 1).over(Window.partitionBy("id").orderBy("myTime")))
myDF = myDF.withColumn("targetNew", when(col("target") == "unknown", col("targetPrev")).otherwise(col("target")))
myDF = myDF.select(
"id",
"myTime",
col("targetNew").alias("target"))
cc = cc - 1
i = myDF.filter(col("target") == "unknown").count()
print i
myDF = myDF.checkpoint()
return myDF
myData = spark.read.load(myPath)
myData = extendTarget(myData, 20, 0)
myData.write.parquet(myPathPart1)
以硬盘为代价 space 用于检查点,允许循环永远继续下去!但一般来说,不要使用循环(对于无限循环也是如此),而是使用 First 和 Last 并忽略 Nulls。
在 Zeppelin 中使用 pyspark。 在我找到正确的做事方式之前(Last over a Window),我有一个循环将前一行的值逐一扩展到自身(我知道循环是不好的做法)。然而,在 运行ning 几百次之后,它在达到最佳情况条件 = 0 之前失败并出现 nullPointerException。
为了解决这个错误,(在我发现最后一个命令之前),我对中点条件 =1000 进行了循环 运行 几百次,转储结果。 运行 再次使用 condition=500,冲洗并重复直到达到 condition=0。
def extendTarget(myDF, loop, lessThan):
i = myDF.filter(col("target") == "unknown").count()
while (i > lessThan):
cc = loop
while (cc > 0):
myDF = myDF.withColumn("targetPrev", lag("target", 1).over(Window.partitionBy("id").orderBy("myTime")))
myDF = myDF.withColumn("targetNew", when(col("target") == "unknown", col("targetPrev")).otherwise(col("target")))
myDF = myDF.select(
"id",
"myTime",
col("targetNew").alias("target"))
cc = cc - 1
i = myDF.filter(col("target") == "unknown").count()
print i
return myDF
myData = spark.read.load(myPath)
myData = extendTarget(myData, 20, 0)
myData.write.parquet(myPathPart1)
我预计它会花费很长时间(因为我做错了),但不要指望它会出现异常
Output (given inputs (myData, 20, 0)
38160
22130
11375
6625
5085
4522
4216
3936
3662
3419
3202
Error
Py4JJavaError: An error occurred while calling o26814.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 1539.0 failed 4 times, most recent failure: Lost task 32.3 in stage 1539.0 (TID XXXX, ip-XXXX, executor 17): ExecutorLostFailure (executor 17 exited caused by one of the running tasks) Reason: Container from a bad node: container_XXXX_0001_01_000033 on host: ip-XXXX. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_XXXX_0001_01_000033
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
at org.apache.hadoop.util.Shell.run(Shell.java:869)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:235)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 50
.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2830)
at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2829)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
at sun.reflect.GeneratedMethodAccessor388.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling o26814.count.\n', JavaObject id=o26815), <traceback object at 0x7efc521b11b8>)
我只能猜测这与内存或缓存有关。即使我重用所有变量名。如果是内存问题,是否有垃圾收集或清除 cache/memory 命令,我可以将其放在打印命令旁边以使其永远循环?
我再次知道使用循环是一种不好的做法,特别是如果循环似乎永远持续下去,但有时 better\smarter 代码不会在我需要时出现,所以在此期间我会破解尽我所能。
在 Cronoik 的评论中回答
能否请您尝试以下操作(这将使执行计划保持较小):myDF = myDF.select("id", "myTime", col("targetNew")。 alias("target")).checkpoint()(将 via spark.setCheckpointDir 之前的对应行和之前的检查点目录替换掉)。 – cronoik 9 月 9 日 4:05
我测试了一下,它成功了!虽然前几次它失败了,主要是由于我对检查点如何工作的误解。你想把你的答案作为答案吗?我必须弄清楚的一些事情作为答案的一部分会很好 spark.sparkContext.setCheckpointDir("path/") - 设置 checkpointPath myDF.checkpoint() 不起作用,你必须重新分配该检查点myDF = myDF.checkpoint() 另外,我发现了空指针异常,如果我在计数之前将循环告诉 运行 200 次,它就会出现 nullpointerException 并且 sparkContext 结束。 (必须重新启动)– Ranald Fong 9 月 10 日 2:44
此外,在每个循环内设置检查点会导致它 运行 变慢,因为它可能在每个循环 运行 上设置检查点。真正加快速度的是将检查点放在打印之前,允许它在检查点之前在内存中循环 x amt 次(我使用了 100 次)。谢谢 cronoik,如果你把你的答案作为答案,我会把它标记为我的答案! – Ranald Fong 9 月 10 日 2:53
编辑:评论者想要详细信息
重点是将一个值从一行扩展到最后,替换所有未知数
| id | time | target |
| a | 1:00 | 1 |
| a | 1:01 | unknown|
| a | . | . |
| a | 5:00 | unknown|
| a | 5:01 | 2 |
至
| id | time | target |
| a | 1:00 | 1 |
| a | 1:01 | 1 |
| a | . | 1 |
| a | 5:00 | 1 |
| a | 5:01 | 2 |
代码已更改为使用检查点
spark.sparkContext.setCheckpointDir(".../myCheckpointsPath/")
def extendTarget(myDF, loop, lessThan):
i = myDF.filter(col("target") == "unknown").count()
while (i > lessThan):
cc = loop
while (cc > 0):
myDF = myDF.withColumn("targetPrev", lag("target", 1).over(Window.partitionBy("id").orderBy("myTime")))
myDF = myDF.withColumn("targetNew", when(col("target") == "unknown", col("targetPrev")).otherwise(col("target")))
myDF = myDF.select(
"id",
"myTime",
col("targetNew").alias("target"))
cc = cc - 1
i = myDF.filter(col("target") == "unknown").count()
print i
myDF = myDF.checkpoint()
return myDF
myData = spark.read.load(myPath)
myData = extendTarget(myData, 20, 0)
myData.write.parquet(myPathPart1)
以硬盘为代价 space 用于检查点,允许循环永远继续下去!但一般来说,不要使用循环(对于无限循环也是如此),而是使用 First 和 Last 并忽略 Nulls。