Amazon EMR Pyspark:rdd.distinct.count() 失败

Amazon EMR Pyspark: rdd.distinct.count() failling

我目前正在使用连接到 RDS 的 EMR 集群来收集 2 table。

创建的两个 RDD 相当大,但我可以执行其他的 .take(x) 操作。

我还可以执行更复杂的操作,例如:

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda)
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))

但是执行以下操作来统计从 RDS 导入的不同用户数不起作用:

unique_users = rdd.distinct.count()

之前试过很多配置看是不是内存问题(以防万一,但没有解决问题)...

这些是我现在遇到的错误:

Traceback (most recent call last):
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module>
run_server()
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server
AppServer().run()
File "/home/hadoop/AppEngine/src/server.py", line 45, in run
api = create_app(self.context, self.apps, self.devices)
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app
engine = AppEngine(spark_context, apps, devices)
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__
self.unique_users = self.ratings.distinct().count()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in  stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)`

ExecutorLostFailure 原因:执行器心跳在 164253 毫秒后超时

这个错误的意思是执行器在165秒后没有响应,被kill掉了(假设它已经死了)

如果你碰巧有一个任务占用执行器这么长时间,需要执行你可以在spark-submit命令行中尝试以下设置,这将增加心跳超时如此处所述的大量时间:

可以在此处找到一些如何调查此问题的方法:


下面将尝试澄清您问题中提出的一些问题。

Spark Actions vs Transformations

Spark 使用惰性计算,即当您执行 transformation 时它不会执行。 Spark 仅在执行 action

时执行

在您给出的复杂操作示例中没有任何操作(即什么都没有 executed/computed):

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda)
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))

正在审核spark doc about transformation

你可以看到示例中使用的所有操作:mapgroupByKeyjoin都是转换。

因此在您执行这些命令后实际上什么也没做。

动作的区别

The two RDD created are quite huge but I can perform .take(x) operations other them.

take(x) 动作与 count

有区别

take(x) 操作在返回前 x 个元素后结束。

count() action只有通过整个RDD

才结束

您执行了一些 似乎是 运行 的转换(如示例中所示)这一事实没有任何意义 - 因为它们没有被执行。

运行 take(x) 操作无法给出任何指示,因为它只会使用您的 RDD 的很小一部分。

结论

您的机器配置似乎不支持您正在使用的数据大小,或者您的代码创建了巨大的任务,导致执行程序挂起很长一段时间(160 秒)。

在你的 RDD 上实际执行的第一个 actioncount action

问题的解决方案如下:

我没有足够的内存来执行任务。 我将集群中使用的核心实例的类型更改为具有更多可用内存的实例(此处为 m4.4xlarge)。

然后我必须精确参数以强制为 spark-sumbmit 实例分配内存:

--driver-memory 2G
--executor-memory 50G

您还可以添加这些参数来避免长时间任务因心跳或内存分配而失败:

--conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096)
--conf spark.executor.heartbeatInterval=60s