toPandas() 从 Jupyter iPython Notebook 工作但提交失败 - AWS EMR

toPandas() work from Jupyter iPython Notebook but fails on submit - AWS EMR

我有一个程序: 1.读取一些数据 2.执行一些操作 3.保存一个csv文件 4. 将该文件传输到 FTP

我正在使用 Amazon EMR 集群和 PySpark 来完成此任务。

对于第 4 步,我需要将 CSV 保存在本地存储而不是 HDFS 上。为此,我将 Spark 数据帧转换为 Pandas 数据帧。

片段可以是:

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.mllib.evaluation import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from time import time
import timeit
from datetime import datetime, timedelta
import numpy as np
import random as rand
import pandas as pd
from itertools import combinations, permutations
from collections import defaultdict
from ftplib import FTP
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("Recommendation").set('spark.driver.memory', '8G').set('spark.executor.memory', '4G')
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)


readRdd = sqlContext.read.format('com.databricks.spark.csv').load('s3n://my-bucket/myfile' + path)

df = readRdd.toPandas() # <---------- PROBLEM
print('toPandas() completed')

df.to_csv('./myFile')

问题是:

当我 运行 来自同一集群上的 Jpyter iPython notebook 的这段代码时,它就像一个魅力。但是当我 运行 使用 Spark 提交此代码,或将其作为步骤添加到 EMR 时,代码在以下行失败:

df = readRdd.toPandas() 

'toPandas() completed' 从不打印

在 spark 作业监视器中,我可以看到 toPandas() 方法已执行,但紧接着我收到错误。

16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 1 executor(s).
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1462.0 in stage 17.0 (TID 10624) in 2089 ms on ip-172-31-38-70.eu-west-1.compute.internal (1515/1516)
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1485.0 in stage 17.0 (TID 10647) in 2059 ms on ip-172-31-38-70.eu-west-1.compute.internal (1516/1516)
16/10/10 13:17:47 INFO YarnClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool 
16/10/10 13:17:47 INFO DAGScheduler: ResultStage 17 (toPandas at 20161007_RecPipeline.py:182) finished in 12.609 s
16/10/10 13:17:47 INFO DAGScheduler: Job 4 finished: toPandas at 20161007_RecPipeline.py:182, took 14.646644 s
16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:50 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:50 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:53 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:53 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:56 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:56 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:59 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:59 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:02 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:02 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:05 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:05 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:08 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:08 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:11 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:11 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:14 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:14 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:17 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:17 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:20 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:20 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:23 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:23 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:26 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:26 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:29 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:29 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:32 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:32 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:35 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:35 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:36 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/10 13:18:36 INFO SparkContext: Invoking stop() from shutdown hook
16/10/10 13:18:36 INFO SparkUI: Stopped Spark web UI at http://172.31.37.28:45777
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Shutting down all executors
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Asking each executor to shut down
16/10/10 13:18:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/10 13:18:36 ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write(PythonRDD.scala:440)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anon$$anonfun$run.apply$mcV$sp(PythonRDD.scala:648)
    at org.apache.spark.api.python.PythonRDD$$anon$$anonfun$run.apply(PythonRDD.scala:648)
    at org.apache.spark.api.python.PythonRDD$$anon$$anonfun$run.apply(PythonRDD.scala:648)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
    at org.apache.spark.api.python.PythonRDD$$anon.run(PythonRDD.scala:649)
16/10/10 13:18:36 ERROR ApplicationMaster: User application exited with status 143
16/10/10 13:18:36 INFO ApplicationMaster: Final app status: FAILED, exitCode: 143, (reason: User application exited with status 143)
16/10/10 13:18:36 INFO MemoryStore: MemoryStore cleared
16/10/10 13:18:36 INFO BlockManager: BlockManager stopped
16/10/10 13:18:36 INFO BlockManagerMaster: BlockManagerMaster stopped
16/10/10 13:18:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/10 13:18:36 INFO SparkContext: Successfully stopped SparkContext
16/10/10 13:18:36 INFO ShutdownHookManager: Shutdown hook called
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-eab43d4e-7201-4bcb-8ee7-0e7b546e8fd8
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af/pyspark-34bec23c-a686-475d-85c9-9e9228b23239
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/container_1476100925559_0002_01_000001/tmp/spark-96cdee47-e3f3-45f4-8bc7-0df5928ef53c
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt2/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-f6821ea1-6f37-4cc6-8bba-049ac0215786
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt1/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1827cae8-8a60-4b29-a4e5-368a8e1856fd

我的集群配置如下:

spark-defaults  spark.driver.maxResultSize  8G
spark-defaults  spark.driver.memory 8G
spark-defaults  spark.executor.memory   4G

Spark 提交命令如下所示:

spark-submit --deploy-mode cluster s3://my-bucket/myPython.py

我快要死了!有人请指点我可以看的方向吗?

问题出在这里:

spark-submit --deploy-mode cluster s3://my-bucket/myPython.py

在上面的命令中,部署模式设置为集群,这意味着将从核心节点中选择一个节点到运行驱动程序。由于允许的驱动程序内存为 8G,并且核心节点是较小的物理实例,因此它们总是 运行 超出所需的内存。

解决方案是在客户端模式下部署,其中驱动程序将始终 运行 在主节点上(在我的情况下是一个更大的物理实例,具有更多资源)不会 运行 超出所需的内存整个过程。

因为它是一个专用集群,所以这个解决方案适用于我的情况。

对于部署模式必须为集群的共享集群,使用更大的实例应该可行。