如何提高 EMR 集群中的 Spark 性能

How to improve spark performace in EMR cluster

我正在查看整合位置,我可以在其中查看需要在 Spark 作业中调整的所有高级参数,以便在假设您已分配足够的节点的情况下从集群中获得更好的性能。我确实经历了 link 但是一次处理太多了 https://spark.apache.org/docs/2.4.5/configuration.html#available-properties

我在下面列出了我的发现,这将帮助人们在深入研究上述 link 以及用例

之前首先了解

下面是我发现对调整作业有帮助的参数列表,每当我发现参数的用例时,我都会继续附加它

Parameter What to look for
spark.scheduler.mode FAIR or FIFO, This decides how you want to allocate executors to jobs
executor-memory Check OOM in executors if you find they are going OOM probably this is the reason or check for executor-cores values, wheather they are too small causing the load on executors
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
driver-memory If you are doing a collect kind of operation (i.e. any operation that sends data back to Driver) then look for tuning this value
executor-cores Its value really depends on what kind of processing you are looking for is it a multi-threaded approach/ light process. The below doc can help you to understand it better
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
spark.default.parallelism This helped us a quite bit in reducing job execution time, initially run the job without this value & observe what value is default set by the cluster (it does base on cluster size). If you see the value too high then try to reduce it, We generally reduced it to the below logic

number of Max core nodes * number of threads per machine + number of Max on-demand nodes * number of threads per machine + number of Max spot nodes * number of threads per machine
spark.sql.shuffle.partitions This value is used when your job is doing quite shuffling of data e.g. DF with cross joins or inner join when it's not repartitioned on joins clause
dynamic executor allocation This helped us quite a bit from the pain of allocating the exact number of the executors to the job. Try to tune below
spark.dynamicAllocation.minExecutors To start your application these numbers of executors are needed else it will not start. This is quite helpful when you don't want to make your job crawl on 1 or 2 available executors
spark.dynamicAllocation.maxExecutors Max amount of executors can be used to ensure the job does not end up consuming all cluster resources in case its multi-job cluster running parallel jobs
spark.dynamicAllocation.initialExecutors This is quite helpful when the driver is doing some initial job before spawning the jobs to executors e.g. listing the files in a folder so it will delete only those files at end of the job. This ensures you can provide min executors but can get a head start with fact know that driver is going to take some time to start
spark.dynamicAllocation.executorIdleTimeout This is also helpful in the above-mentioned case where the driver is doing some work & has nothing to assign to the executors & you don't want them to time out causing reallocation of executors which will take some time
https://spark.apache.org/docs/2.4.5/configuration.html#dynamic-allocation
Trying to reduce the number of files created while writing the partitions As our data is read by different executors while writing each executor will write its own file. This will end up in creating a large number of small files & in intern the query on those will be heavy. There are 2 ways to do it
Coalesce: This will try to do minimum shuffle across the executors & will create an un-even file size
repartition: This will do a shuffle of data & creates files with ~ equal size
/>
coalescemaxRecordsPerFile: This parameter is helpful in informing spark, how many records per file you are looking for
When you are joining small DF with large DF Check if you can use broadcasting of the small DF by default Spark use the sort-merge join, but if your table is quite low in size see if you can broadcast those variables
https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c
How one can hint spark to use broadcasting:
Below parameters you need to look for doing broadcast joins are
spark.sql.autoBroadcastJoinThreshold This helps spark to understand for a given size of DF whether to used broadcast join or not
spark.driver.maxResultSize Max result will be returned to the driver so it can broadcast them
driver-memory As the driver is doing broadcasting of result this needs to be bigger
spark.network.timeout spark.executor.heartbeatInterval This helps in the case where you see an abrupt termination of executors from drivers, there could be multiple reasons but if nothing is specifically found you can check on these parameters
https://spark.apache.org/docs/2.4.5/configuration.html#execution-behavior
Data is Skewed across customers Try to find out a way where you can trigger the jobs for descending order of volume per customer. This ensures you that cluster will be well occupied during the initial run & long-running customer gets some time while small customers are completing their job. Also, you can drop the customer if no data is present for a given customer to reduce the load on the cluster