Spark 核心和任务并发
Spark cores & tasks concurrency
我有一个关于 spark 的非常基本的问题。我通常 运行 使用 50 个核心来激发作业。在查看工作进度时,大多数时候它会显示 50 个进程 运行ning 并行(正如它应该做的那样),但有时它只显示 2 或 4 个 spark 进程 运行ning 并行。像这样:
[Stage 8:================================> (297 + 2) / 500]
正在处理的 RDD repartitioned
在 100 多个分区上。所以这应该不是问题。
不过我有一个观察。我已经看到大多数情况下它发生的模式,SparkUI 中的数据局部性显示 NODE_LOCAL
,而其他时候所有 50 个进程都是 运行ning,一些进程显示 RACK_LOCAL
.
这让我怀疑,也许发生这种情况是因为数据在同一节点中处理之前被缓存以避免网络开销,这减慢了进一步处理的速度。
如果是这种情况,有什么办法可以避免。如果不是这样,这是怎么回事?
经过一周或更长时间的努力解决这个问题,我想我已经找到了导致问题的原因。
如果您遇到同样的问题,最好先检查 Spark 实例是否配置正确。有一个很棒的cloudera blog post。
但是,如果问题不在于配置(我就是这种情况),那么问题出在您的代码中。问题是,有时由于不同的原因(倾斜连接、数据源中的不均匀分区等),您正在处理的 RDD 在 2-3 个分区上获取大量数据,而其余分区的数据很少。
为了减少网络中的数据洗牌,Spark 尝试让每个执行器处理驻留在该节点本地的数据。因此,有 2-3 个 executor 工作了很长时间,其余的 executor 在几毫秒内就处理完了数据。这就是我遇到上述问题中描述的问题的原因。
调试这个问题的方法是首先检查你的RDD的分区大小。如果一个或几个分区与其他分区相比非常大,那么下一步就是在大分区中找到记录,这样您就可以知道,尤其是在倾斜连接的情况下,哪个键发生了倾斜。我写了一个小函数来调试这个:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
它给了我最小和最大的分区大小,如果这两者之间的差异超过 5 倍,它会打印最大分区的 5 个元素,让您大致了解发生了什么。
一旦你发现问题是偏斜分区,你可以找到一种方法来摆脱那个偏斜键,或者你可以重新分区你的数据帧,这将迫使它得到平均分布,你现在将看到所有执行程序将在相同的时间内工作,并且您将看到可怕的 OOM 错误少得多,处理速度也将显着加快。
这些只是我作为 Spark 新手的两分钱,我希望 Spark 专家可以为这个问题补充更多,因为我认为 Spark 世界中的很多新手经常面临类似的问题。
我有一个关于 spark 的非常基本的问题。我通常 运行 使用 50 个核心来激发作业。在查看工作进度时,大多数时候它会显示 50 个进程 运行ning 并行(正如它应该做的那样),但有时它只显示 2 或 4 个 spark 进程 运行ning 并行。像这样:
[Stage 8:================================> (297 + 2) / 500]
正在处理的 RDD repartitioned
在 100 多个分区上。所以这应该不是问题。
不过我有一个观察。我已经看到大多数情况下它发生的模式,SparkUI 中的数据局部性显示 NODE_LOCAL
,而其他时候所有 50 个进程都是 运行ning,一些进程显示 RACK_LOCAL
.
这让我怀疑,也许发生这种情况是因为数据在同一节点中处理之前被缓存以避免网络开销,这减慢了进一步处理的速度。
如果是这种情况,有什么办法可以避免。如果不是这样,这是怎么回事?
经过一周或更长时间的努力解决这个问题,我想我已经找到了导致问题的原因。
如果您遇到同样的问题,最好先检查 Spark 实例是否配置正确。有一个很棒的cloudera blog post。
但是,如果问题不在于配置(我就是这种情况),那么问题出在您的代码中。问题是,有时由于不同的原因(倾斜连接、数据源中的不均匀分区等),您正在处理的 RDD 在 2-3 个分区上获取大量数据,而其余分区的数据很少。
为了减少网络中的数据洗牌,Spark 尝试让每个执行器处理驻留在该节点本地的数据。因此,有 2-3 个 executor 工作了很长时间,其余的 executor 在几毫秒内就处理完了数据。这就是我遇到上述问题中描述的问题的原因。
调试这个问题的方法是首先检查你的RDD的分区大小。如果一个或几个分区与其他分区相比非常大,那么下一步就是在大分区中找到记录,这样您就可以知道,尤其是在倾斜连接的情况下,哪个键发生了倾斜。我写了一个小函数来调试这个:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
它给了我最小和最大的分区大小,如果这两者之间的差异超过 5 倍,它会打印最大分区的 5 个元素,让您大致了解发生了什么。
一旦你发现问题是偏斜分区,你可以找到一种方法来摆脱那个偏斜键,或者你可以重新分区你的数据帧,这将迫使它得到平均分布,你现在将看到所有执行程序将在相同的时间内工作,并且您将看到可怕的 OOM 错误少得多,处理速度也将显着加快。
这些只是我作为 Spark 新手的两分钱,我希望 Spark 专家可以为这个问题补充更多,因为我认为 Spark 世界中的很多新手经常面临类似的问题。