为什么在完成作业和关闭 Spark 之间会发生磁盘繁忙峰值?
Why does a disk busy spike happen between finishing of a job and shutting down Spark?
我在完成所有 spark 任务后检测到意外的磁盘 IO(DISKBUSY 尖峰)
已完成但 spark 上下文尚未停止——如图案例 2 所示
21:56:47
。谁能帮忙解释一下,并给出如何避免的建议
还是推迟?或者 spark 上下文是否有一些周期性的异步 IO
可能导致尖峰的活动?谢谢!
给出了一个例子运行一个SparkSQL批处理作业在两种情况下。在第一
一,我执行 sql 工作负载,并在之后立即停止 spark context
.show()
动作结束。在第二种情况下,我在之后增加了 1 分钟的睡眠时间
.show()
通过使用 Thread.sleep(60000)
,然后停止 spark 上下文。结果表明,两种情况下执行 sql 工作负载的时间成本相似,但是 正在磁盘上出现意外的 DISKBUSY 尖峰 正在为 shuffle write 进行本地存储在第二种情况下。请参见案例 2 中的尖峰。
这里有更多详细信息。
系统设置
- 用于元数据存储的 Spark 2.3.1、Hadoop 2.9.1、Hive 2.3.4。
- 一个主节点和两个工作节点(worker1 和 worker2)。每个节点都有足够的可用资源
(32核,750G内存,disk1到disk8共8个8T磁盘)。
- HDFS部署在disk8; disk1 用于 spark shuffle 写入本地存储。
- 我使用 Yarn 作为集群管理。
- 我使用系统监控工具"nmon" 来检测磁盘活动。
- 后台没有其他大应用运行。
- 我在提交代码时使用
yarn client
模式。我使用 8 个执行器,每个执行器都有 4 个内核和 8GB 内存。
- 注意,我把HDFS和Yarn本地文件放在两个不同的磁盘上——
yarn_local
目录在每个worker的disk1上,HDFS部署在两个worker节点的disk8s上。每个磁盘有8T
。这样就可以区分HDFS和本地磁盘的活动了。
这是我目前的分析
- 不是磁盘本身和其他后台进程造成的。我尝试将 disk2、disk3、disk4 和 disk8 用于 yarn 本地存储,以测试尖峰是否与程序相关,并且每次执行案例 2 时它都显示相同的尖峰。
- 峰值是由 Spark 本身引起的。我尝试了独立部署模式,但峰值仍然存在(没有 Yarn)。
- 可能与洗牌有关。我的目标批处理作业的总随机写入大小接近
2GB
。我还尝试了不同的工作负载,其随机写入大小接近 1MB
、250MB
和 1GB
。对于改组写入大小 1MB
的批处理作业,DISKBUSY 变得可以忽略不计,对于总改组写入大小 250MB
的批处理作业,DISKBUSY 变得高达 80%
。
- 跟踪本地存储文件的大小。当出现磁盘尖峰时,检测到磁盘写入但磁盘大小没有增加。因此,(1)它可能与磁盘缓存清理无关(2)它可能发生了一些磁盘交换(不太确定)。
根据我目前的分析,我怀疑它应该是由我不熟悉的东西引起的 -- 例如磁盘上的一些火花异步行为。谁能帮忙解释一下?谢谢!
这里是第一种情况。
这里是第二种情况。
图中为了更清楚,worker1 node local
代表worker1中的disk1,the worker2 local
代表worker2中的disk1; worker1 node dfs
代表worker1中的disk8,worker2 node dfs
代表worker2中的disk8,HDFS所在的位置。左边的 y 轴是 nmon
检测到的 diskbusy(从 0% 到 100%),右边的 y 轴是 disk8 中 hdfs 目录的大小(对于这个问题我们可以忽略)。
这是我的代码。
import org.apache.spark.sql.SparkSession
object Q16 {
def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"
val spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
spark.sql(s"use $db")
val t1 = System.currentTimeMillis()
spark.sql(
s"""
|SELECT w_state, i_item_id,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_before,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_after
|FROM (
| SELECT *
| FROM web_sales ws
| LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
| AND ws.ws_item_sk = wr.wr_item_sk)
|) a1
|JOIN item i ON a1.ws_item_sk = i.i_item_sk
|JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
|JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
|GROUP BY w_state,i_item_id
|--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
|ORDER BY w_state,i_item_id
|LIMIT 100
""".stripMargin).show
val t2 = System.currentTimeMillis()
// For case 2
// Thread.sleep(60 * 1000)
spark.stop()
}
}
我找出了意外 IO 的原因 activity。
这是文件系统缓冲区缓存行为。通常,当进程写入文件时,数据不会立即写入磁盘,而是写入内存中的缓存。此缓存由 OS/file 系统维护,作为性能优化,因为它允许在写入内存后向 return 写入请求,而不是等待缓慢的 I/Os 完成。 OS 在后台定期将这些脏数据刷新到磁盘。
所以基本上,磁盘活动(刷新)是不可避免的,除非文件页面在缓存在磁盘缓冲区中时被删除(情况 1)。
您可以使用Linux系统命令sync
.
强制立即写出所有脏数据
我在完成所有 spark 任务后检测到意外的磁盘 IO(DISKBUSY 尖峰)
已完成但 spark 上下文尚未停止——如图案例 2 所示
21:56:47
。谁能帮忙解释一下,并给出如何避免的建议
还是推迟?或者 spark 上下文是否有一些周期性的异步 IO
可能导致尖峰的活动?谢谢!
给出了一个例子运行一个SparkSQL批处理作业在两种情况下。在第一
一,我执行 sql 工作负载,并在之后立即停止 spark context
.show()
动作结束。在第二种情况下,我在之后增加了 1 分钟的睡眠时间
.show()
通过使用 Thread.sleep(60000)
,然后停止 spark 上下文。结果表明,两种情况下执行 sql 工作负载的时间成本相似,但是 正在磁盘上出现意外的 DISKBUSY 尖峰 正在为 shuffle write 进行本地存储在第二种情况下。请参见案例 2 中的尖峰。
这里有更多详细信息。
系统设置
- 用于元数据存储的 Spark 2.3.1、Hadoop 2.9.1、Hive 2.3.4。
- 一个主节点和两个工作节点(worker1 和 worker2)。每个节点都有足够的可用资源 (32核,750G内存,disk1到disk8共8个8T磁盘)。
- HDFS部署在disk8; disk1 用于 spark shuffle 写入本地存储。
- 我使用 Yarn 作为集群管理。
- 我使用系统监控工具"nmon" 来检测磁盘活动。
- 后台没有其他大应用运行。
- 我在提交代码时使用
yarn client
模式。我使用 8 个执行器,每个执行器都有 4 个内核和 8GB 内存。 - 注意,我把HDFS和Yarn本地文件放在两个不同的磁盘上——
yarn_local
目录在每个worker的disk1上,HDFS部署在两个worker节点的disk8s上。每个磁盘有8T
。这样就可以区分HDFS和本地磁盘的活动了。
这是我目前的分析
- 不是磁盘本身和其他后台进程造成的。我尝试将 disk2、disk3、disk4 和 disk8 用于 yarn 本地存储,以测试尖峰是否与程序相关,并且每次执行案例 2 时它都显示相同的尖峰。
- 峰值是由 Spark 本身引起的。我尝试了独立部署模式,但峰值仍然存在(没有 Yarn)。
- 可能与洗牌有关。我的目标批处理作业的总随机写入大小接近
2GB
。我还尝试了不同的工作负载,其随机写入大小接近1MB
、250MB
和1GB
。对于改组写入大小1MB
的批处理作业,DISKBUSY 变得可以忽略不计,对于总改组写入大小250MB
的批处理作业,DISKBUSY 变得高达80%
。 - 跟踪本地存储文件的大小。当出现磁盘尖峰时,检测到磁盘写入但磁盘大小没有增加。因此,(1)它可能与磁盘缓存清理无关(2)它可能发生了一些磁盘交换(不太确定)。
根据我目前的分析,我怀疑它应该是由我不熟悉的东西引起的 -- 例如磁盘上的一些火花异步行为。谁能帮忙解释一下?谢谢!
这里是第一种情况。
这里是第二种情况。
图中为了更清楚,worker1 node local
代表worker1中的disk1,the worker2 local
代表worker2中的disk1; worker1 node dfs
代表worker1中的disk8,worker2 node dfs
代表worker2中的disk8,HDFS所在的位置。左边的 y 轴是 nmon
检测到的 diskbusy(从 0% 到 100%),右边的 y 轴是 disk8 中 hdfs 目录的大小(对于这个问题我们可以忽略)。
这是我的代码。
import org.apache.spark.sql.SparkSession
object Q16 {
def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"
val spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
spark.sql(s"use $db")
val t1 = System.currentTimeMillis()
spark.sql(
s"""
|SELECT w_state, i_item_id,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_before,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_after
|FROM (
| SELECT *
| FROM web_sales ws
| LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
| AND ws.ws_item_sk = wr.wr_item_sk)
|) a1
|JOIN item i ON a1.ws_item_sk = i.i_item_sk
|JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
|JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
|GROUP BY w_state,i_item_id
|--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
|ORDER BY w_state,i_item_id
|LIMIT 100
""".stripMargin).show
val t2 = System.currentTimeMillis()
// For case 2
// Thread.sleep(60 * 1000)
spark.stop()
}
}
我找出了意外 IO 的原因 activity。
这是文件系统缓冲区缓存行为。通常,当进程写入文件时,数据不会立即写入磁盘,而是写入内存中的缓存。此缓存由 OS/file 系统维护,作为性能优化,因为它允许在写入内存后向 return 写入请求,而不是等待缓慢的 I/Os 完成。 OS 在后台定期将这些脏数据刷新到磁盘。
所以基本上,磁盘活动(刷新)是不可避免的,除非文件页面在缓存在磁盘缓冲区中时被删除(情况 1)。
您可以使用Linux系统命令sync
.