计数操作导致更多 rack_local pyspark

Count operation resulting in more rack_local pyspark

我试图了解 Spark 集群上的位置级别及其与 RDD 分区数的关系以及对其执行的操作。具体来说,我有一个分区数为 9647 的数据框。然后,我对其执行 df.count 并在 Spark UI:

中观察到以下内容

一些上下文,我使用以下配置将作业提交到 Yarn 集群:

- executor_memory='10g',
- driver_memory='10g',
- num_executors='5',
- executor_cores=5'

此外,我注意到所有执行程序都来自 5 个不同的节点(主机)。

从图中,我发现在所有9644个任务中,超过95%的任务不在运行同一个节点内。所以,我只是想知道有很多 rack_local 的原因。具体来说,为什么节点不选择最近的数据源执行,换句话说,在本地有更多的节点?[​​=14=]

谢谢

这里有几点需要考虑。

您可以在下面找到一些影响 Spark 中数据局部性的因素:

  1. 最初 Spark 将尝试从其基础 class.
  2. 放置 task as close as possible to the node where the source data exists. For instance if the source system is HDFS, Spark will try to execute the task in the same node where the data of the specific partition exists. Spark will find the preferred location for each RDD by implementing the getPreferredLocations. Later on the TaskScheduler will leverage this information to decide about the locality of the task. In the definition of the RDD you can find the definition of the getPreferredLocations which is responsible for specifying the optimal location of the RDD. For example, if the source is HDFS Spark will create an instance of HadoopRDD (or NewHadoopRDD) and it will access the Hadoop API to retrieve the information about the location of the source files overridinggetPreferredLocations 函数
  3. 无法实现高局部性eg:PROCESS_LOCAL或NODE_LOCAL的主要原因是目标节点资源不足。 Spark 使用设置 spark.locality.wait 来设置 等待时间 应该采取关于 locality 级别的决定。 Spark 将使用此设置等待资源可用的特定时间。如果在 spark.locality.wait 间隔到期后节点上没有 资源(核心) 可用,那么 Spark 将 降级 位置级别例如:PROCESS_LOCAL -> NODE_LOCAL 新的降级级别会发生同样的情况,直到满足所需的资源规格。另一方面,升级 任务位置的一种方法是添加更多资源,例如:添加一个新的执行程序。找到的测试 here(第 915 行)演示了这种情况。默认值是 3 秒,如果你认为你应该给你的任务更多的时间,你可能会决定增加这个值,尽管不建议(可以低效地增加 Spark 空闲时间)。
  4. 如果您的数据位于 Spark 集群之外,那么位置级别将设置为任意。

我对改进局部性的最后建议是使用 repartition() + persist() or cache() 让 Spark 知道分区的位置。

注意:持久化会在第一次调用动作后生效。

有用链接:

https://www.waitingforcode.com/apache-spark/spark-data-locality/read

http://www.russellspitzer.com/2017/09/01/Spark-Locality/

https://github.com/apache/spark/blob/0bb716bac38488bc216fbda29ce54e93751e641b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala