Spark 优化 - 加入 - 任务数量非常少 - OOM
Spark optimization - joins - very low number of task - OOM
我的 spark 应用程序失败并出现此错误:Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
这是我检查容器日志时得到的:java.lang.OutOfMemoryError: Java heap space
我的应用程序主要是获取 table 然后加入我从 aws S3 读取的不同 table:
var result = readParquet(table1)
val table2 = readParquet(table2)
result = result.join(table2 , result(primaryKey) === table2(foreignKey))
val table3 = readParquet(table3)
result = result.join(table3 , result(primaryKey) === table3(foreignKey))
val table4 = readParquet(table4)
result = result.join(table4 , result(primaryKey) === table4(foreignKey))
等等
当我尝试使用 :
将我的结果数据帧保存到 postgresql 时,我的应用程序失败了
result.toDF(df.columns.map(x => x.toLowerCase()): _*).write
.mode("overwrite")
.format("jdbc")
.option(JDBCOptions.JDBC_TABLE_NAME, table)
.save()
在我失败的加入阶段,我的任务数量非常少:4 个执行者 6 个任务
为什么我的 Stage 阶段会生成 2 个工作岗位?
第一个完成了426个任务:
第二个失败了:
我的 spark-submit conf :
dynamicAllocation = true
num core = 2
driver memory = 6g
executor memory = 6g
max num executor = 10
min num executor = 1
spark.default.parallelism = 400
spark.sql.shuffle.partitions = 400
我尝试了更多资源但同样的问题:
num core = 5
driver memory = 16g
executor memory = 16g
num executor = 20
我认为即使默认分区数为 400,所有数据都会进入相同的 partition/executor,这会导致 OOM 错误
我试过了(没有成功):
保存数据
broadcastJoin,但是我的table不够小,最后广播不了。
重新分区到更高的数字(4000)并在每个连接之间进行计数以执行操作:
我的主要table接缝增长非常快:
(行数)40 -> 68 -> 7304 -> 946 832 -> 123 032 864 -> 246 064 864 ->(之后的时间太多)
但是数据量接缝非常低
如果我查看任务指标,一件有趣的事情是我的数据接缝倾斜(我真的不确定)
在最后一个计数操作中,我可以看到 ~120 个任务执行操作,输入数据为 ~10MB,用于 100 条记录和 12 秒,而其他 3880 个任务绝对没有做任何事情(3 毫秒,0 条记录 16B(元数据?)):
驱动程序内存 = 16g 内存太高,不需要。仅当您有大量数据要通过 (collect()) 等操作掌握时才使用确保增加 spark.maxResult.size 如果是这种情况
你可以做以下事情
-- 在读取文件时重新分区 readParquet(table1).repartition(x)。如果 table 之一很小,那么你可以广播它并删除连接,而不是使用 mapPartition并使用广播变量作为查找缓存。
(或)
-- Select 一个均匀分布的列,并使用该特定列相应地重新分区 table。
我需要通过查看以上统计信息来强调两点。你的工作有很高的调度延迟,这是由太多任务引起的,你的任务统计很少有输入数据为 10 字节启动,很少有 9MB 启动....显然,这里有数据偏斜......正如你所说的第一个完成了 426 个任务,但以 4000 作为重新分区计数,它应该启动更多任务
请查看 https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c ... 以获得更多见解。
我的 spark 应用程序失败并出现此错误:Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
这是我检查容器日志时得到的:java.lang.OutOfMemoryError: Java heap space
我的应用程序主要是获取 table 然后加入我从 aws S3 读取的不同 table:
var result = readParquet(table1)
val table2 = readParquet(table2)
result = result.join(table2 , result(primaryKey) === table2(foreignKey))
val table3 = readParquet(table3)
result = result.join(table3 , result(primaryKey) === table3(foreignKey))
val table4 = readParquet(table4)
result = result.join(table4 , result(primaryKey) === table4(foreignKey))
等等
当我尝试使用 :
将我的结果数据帧保存到 postgresql 时,我的应用程序失败了result.toDF(df.columns.map(x => x.toLowerCase()): _*).write
.mode("overwrite")
.format("jdbc")
.option(JDBCOptions.JDBC_TABLE_NAME, table)
.save()
在我失败的加入阶段,我的任务数量非常少:4 个执行者 6 个任务
为什么我的 Stage 阶段会生成 2 个工作岗位?
第一个完成了426个任务:
第二个失败了:
我的 spark-submit conf :
dynamicAllocation = true
num core = 2
driver memory = 6g
executor memory = 6g
max num executor = 10
min num executor = 1
spark.default.parallelism = 400
spark.sql.shuffle.partitions = 400
我尝试了更多资源但同样的问题:
num core = 5
driver memory = 16g
executor memory = 16g
num executor = 20
我认为即使默认分区数为 400,所有数据都会进入相同的 partition/executor,这会导致 OOM 错误
我试过了(没有成功):
保存数据
broadcastJoin,但是我的table不够小,最后广播不了。
重新分区到更高的数字(4000)并在每个连接之间进行计数以执行操作:
我的主要table接缝增长非常快:
(行数)40 -> 68 -> 7304 -> 946 832 -> 123 032 864 -> 246 064 864 ->(之后的时间太多)
但是数据量接缝非常低
如果我查看任务指标,一件有趣的事情是我的数据接缝倾斜(我真的不确定)
在最后一个计数操作中,我可以看到 ~120 个任务执行操作,输入数据为 ~10MB,用于 100 条记录和 12 秒,而其他 3880 个任务绝对没有做任何事情(3 毫秒,0 条记录 16B(元数据?)):
驱动程序内存 = 16g 内存太高,不需要。仅当您有大量数据要通过 (collect()) 等操作掌握时才使用确保增加 spark.maxResult.size 如果是这种情况
你可以做以下事情
-- 在读取文件时重新分区 readParquet(table1).repartition(x)。如果 table 之一很小,那么你可以广播它并删除连接,而不是使用 mapPartition并使用广播变量作为查找缓存。
(或)
-- Select 一个均匀分布的列,并使用该特定列相应地重新分区 table。
我需要通过查看以上统计信息来强调两点。你的工作有很高的调度延迟,这是由太多任务引起的,你的任务统计很少有输入数据为 10 字节启动,很少有 9MB 启动....显然,这里有数据偏斜......正如你所说的第一个完成了 426 个任务,但以 4000 作为重新分区计数,它应该启动更多任务
请查看 https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c ... 以获得更多见解。