为什么在完成作业和关闭 Spark 之间会发生磁盘繁忙峰值?

Why does a disk busy spike happen between finishing of a job and shutting down Spark?

我在完成所有 spark 任务后检测到意外的磁盘 IO(DISKBUSY 尖峰) 已完成但 spark 上下文尚未停止——如图案例 2 所示 21:56:47。谁能帮忙解释一下,并给出如何避免的建议 还是推迟?或者 spark 上下文是否有一些周期性的异步 IO 可能导致尖峰的活动?谢谢!

给出了一个例子运行一个SparkSQL批处理作业在两种情况下。在第一 一,我执行 sql 工作负载,并在之后立即停止 spark context .show() 动作结束。在第二种情况下,我在之后增加了 1 分钟的睡眠时间 .show() 通过使用 Thread.sleep(60000),然后停止 spark 上下文。结果表明,两种情况下执行 sql 工作负载的时间成本相似,但是 正在磁盘上出现意外的 DISKBUSY 尖峰 正在为 shuffle write 进行本地存储在第二种情况下。请参见案例 2 中的尖峰。

这里有更多详细信息。

系统设置

这是我目前的分析

  1. 不是磁盘本身和其他后台进程造成的。我尝试将 disk2、disk3、disk4 和 disk8 用于 yarn 本地存储,以测试尖峰是否与程序相关,并且每次执行案例 2 时它都显示相同的尖峰。
  2. 峰值是由 Spark 本身引起的。我尝试了独立部署模式,但峰值仍然存在(没有 Yarn)。
  3. 可能与洗牌有关。我的目标批处理作业的总随机写入大小接近 2GB。我还尝试了不同的工作负载,其随机写入大小接近 1MB250MB1GB。对于改组写入大小 1MB 的批处理作业,DISKBUSY 变得可以忽略不计,对于总改组写入大小 250MB 的批处理作业,DISKBUSY 变得高达 80%
  4. 跟踪本地存储文件的大小。当出现磁盘尖峰时,检测到磁盘写入但磁盘大小没有增加。因此,(1)它可能与磁盘缓存清理无关(2)它可能发生了一些磁盘交换(不太确定)。

根据我目前的分析,我怀疑它应该是由我不熟悉的东西引起的 -- 例如磁盘上的一些火花异步行为。谁能帮忙解释一下?谢谢!

这里是第一种情况。

这里是第二种情况。

图中为了更清楚,worker1 node local代表worker1中的disk1,the worker2 local代表worker2中的disk1; worker1 node dfs代表worker1中的disk8,worker2 node dfs代表worker2中的disk8,HDFS所在的位置。左边的 y 轴是 nmon 检测到的 diskbusy(从 0% 到 100%),右边的 y 轴是 disk8 中 hdfs 目录的大小(对于这个问题我们可以忽略)。

这是我的代码。

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
    val db = s"bigbench_sf_100"

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql(s"use $db")

    val t1 = System.currentTimeMillis()
    spark.sql(
      s"""
         |SELECT w_state, i_item_id,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_before,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_after
         |FROM (
         |  SELECT *
         |  FROM web_sales ws
         |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
         |    AND ws.ws_item_sk = wr.wr_item_sk)
         |) a1
         |JOIN item i ON a1.ws_item_sk = i.i_item_sk
         |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
         |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
         |GROUP BY w_state,i_item_id
         |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
         |ORDER BY w_state,i_item_id
         |LIMIT 100
       """.stripMargin).show
    val t2 = System.currentTimeMillis()

//    For case 2
//    Thread.sleep(60 * 1000)

    spark.stop()
  }
}

我找出了意外 IO 的原因 activity。

这是文件系统缓冲区缓存行为。通常,当进程写入文件时,数据不会立即写入磁盘,而是写入内存中的缓存。此缓存由 OS/file 系统维护,作为性能优化,因为它允许在写入内存后向 return 写入请求,而不是等待缓慢的 I/Os 完成。 OS 在后台定期将这些脏数据刷新到磁盘。

所以基本上,磁盘活动(刷新)是不可避免的,除非文件页面在缓存在磁盘缓冲区中时被删除(情况 1)。

您可以使用Linux系统命令sync.

强制立即写出所有脏数据