在 Pyspark 中循环导致 sparkException

Looping in Pyspark causes sparkException

在 Zeppelin 中使用 pyspark。 在我找到正确的做事方式之前(Last over a Window),我有一个循环将前一行的值逐一扩展到自身(我知道循环是不好的做法)。然而,在 运行ning 几百次之后,它在达到最佳情况条件 = 0 之前失败并出现 nullPointerException。

为了解决这个错误,(在我发现最后一个命令之前),我对中点条件 =1000 进行了循环 运行 几百次,转储结果。 运行 再次使用 condition=500,冲洗并重复直到达到 condition=0。

def extendTarget(myDF, loop, lessThan):
    i = myDF.filter(col("target") == "unknown").count()
    while (i > lessThan):
        cc = loop
        while (cc > 0):
            myDF = myDF.withColumn("targetPrev", lag("target", 1).over(Window.partitionBy("id").orderBy("myTime"))) 
            myDF = myDF.withColumn("targetNew", when(col("target") == "unknown", col("targetPrev")).otherwise(col("target")))
            myDF = myDF.select(
            "id",
            "myTime",
            col("targetNew").alias("target"))
            cc = cc - 1
        i = myDF.filter(col("target") == "unknown").count()
        print i
    return myDF

myData = spark.read.load(myPath)
myData = extendTarget(myData, 20, 0)
myData.write.parquet(myPathPart1)

我预计它会花费很长时间(因为我做错了),但不要指望它会出现异常

Output (given inputs (myData, 20, 0)
38160
22130
11375
6625
5085
4522
4216
3936
3662
3419
3202

Error 
Py4JJavaError: An error occurred while calling o26814.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 1539.0 failed 4 times, most recent failure: Lost task 32.3 in stage 1539.0 (TID XXXX, ip-XXXX, executor 17): ExecutorLostFailure (executor 17 exited caused by one of the running tasks) Reason: Container from a bad node: container_XXXX_0001_01_000033 on host: ip-XXXX. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_XXXX_0001_01_000033
Exit code: 50
Stack trace: ExitCodeException exitCode=50: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
    at org.apache.hadoop.util.Shell.run(Shell.java:869)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:235)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 50
.
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2029)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2028)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
    at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2830)
    at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2829)
    at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
    at sun.reflect.GeneratedMethodAccessor388.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling o26814.count.\n', JavaObject id=o26815), <traceback object at 0x7efc521b11b8>)

我只能猜测这与内存或缓存有关。即使我重用所有变量名。如果是内存问题,是否有垃圾收集或清除 cache/memory 命令,我可以将其放在打印命令旁边以使其永远循环?

我再次知道使用循环是一种不好的做法,特别是如果循环似乎永远持续下去,但有时 better\smarter 代码不会在我需要时出现,所以在此期间我会破解尽我所能。

在 Cronoik 的评论中回答

能否请您尝试以下操作(这将使执行计划保持较小):myDF = myDF.select("id", "myTime", col("targetNew")。 alias("target")).checkpoint()(将 via spark.setCheckpointDir 之前的对应行和之前的检查点目录替换掉)。 – cronoik 9 月 9 日 4:05

我测试了一下,它成功了!虽然前几次它失败了,主要是由于我对检查点如何工作的误解。你想把你的答案作为答案吗?我必须弄清楚的一些事情作为答案的一部分会很好 spark.sparkContext.setCheckpointDir("path/") - 设置 checkpointPath myDF.checkpoint() 不起作用,你必须重新分配该检查点myDF = myDF.checkpoint() 另外,我发现了空指针异常,如果我在计数之前将循环告诉 运行 200 次,它就会出现 nullpointerException 并且 sparkContext 结束。 (必须重新启动)– Ranald Fong 9 月 10 日 2:44

此外,在每个循环内设置检查点会导致它 运行 变慢,因为它可能在每个循环 运行 上设置检查点。真正加快速度的是将检查点放在打印之前,允许它在检查点之前在内存中循环 x amt 次(我使用了 100 次)。谢谢 cronoik,如果你把你的答案作为答案,我会把它标记为我的答案! – Ranald Fong 9 月 10 日 2:53

编辑:评论者想要详细信息

重点是将一个值从一行扩展到最后,替换所有未知数

| id | time | target |
| a  | 1:00 | 1      |
| a  | 1:01 | unknown|
| a  | .    | .      |
| a  | 5:00 | unknown|
| a  | 5:01 | 2      |

| id | time | target |
| a  | 1:00 | 1      |
| a  | 1:01 | 1      |
| a  | .    | 1      |
| a  | 5:00 | 1      |
| a  | 5:01 | 2      |

代码已更改为使用检查点

spark.sparkContext.setCheckpointDir(".../myCheckpointsPath/")
def extendTarget(myDF, loop, lessThan):
    i = myDF.filter(col("target") == "unknown").count()
    while (i > lessThan):
        cc = loop
        while (cc > 0):
            myDF = myDF.withColumn("targetPrev", lag("target", 1).over(Window.partitionBy("id").orderBy("myTime"))) 
            myDF = myDF.withColumn("targetNew", when(col("target") == "unknown", col("targetPrev")).otherwise(col("target")))
            myDF = myDF.select(
            "id",
            "myTime",
            col("targetNew").alias("target"))
            cc = cc - 1
        i = myDF.filter(col("target") == "unknown").count()
        print i
        myDF = myDF.checkpoint()
    return myDF

myData = spark.read.load(myPath)
myData = extendTarget(myData, 20, 0)
myData.write.parquet(myPathPart1)

以硬盘为代价 space 用于检查点,允许循环永远继续下去!但一般来说,不要使用循环(对于无限循环也是如此),而是使用 First 和 Last 并忽略 Nulls。