spark.read.load() 是动作还是转换?仅此声明就需要时间

Is spark.read.load() an action or transformation? It takes time with this statement alone

我尝试使用下面的代码加载数据,看起来,如果没有任何其他操作,这会花费很多时间。文件越大,时间越长

print("STARTED")

biglog_df = spark.read.format("csv") \
    .option("header",True) \
    .option("inferSchema",True) \
    .option("path","bigLog.txt") \
    .load()

print("DONE STARTING")

当文件大小为 4GB 时打印“DONE STARTING”大约需要 20 秒,而当文件大小为 25GB 时打印“DONE STARTING”需要一分多钟。这是否意味着 Spark 正在尝试加载数据?那么,加载是一个动作吗?

如果将 inferSchema 选项设置为 True,则加载操作不会延迟计算。在这种情况下,spark 将启动一个作业来扫描文件并推断列的类型。

您可以通过在读取文件时通知架构来避免此行为。

您可以通过此测试观察此行为:

  1. 在 pyspark 中打开一个新的交互式会话;
  2. 打开 Spark UI > Pyspark 会话 > 作业

和运行:

df = (
  spark.read.format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
  .load()
)

您会注意到将启动作业以扫描(部分)文件以推断架构。

如果加载通知架构的文件:

import json
from pyspark.sql.types import StructType

json_schema = '{"fields":[{"metadata":{},"name":"zipcode","nullable":true,"type":"integer"},{"metadata":{},"name":"count_property","nullable":true,"type":"integer"},{"metadata":{},"name":"count_fema_sfha","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fema_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_fema_difference_2020","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_fema_difference_2020","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_all","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_2_10","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_no_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_floodfactor1","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor2","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor3","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor4","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor5","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor6","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor7","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor8","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor9","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor10","nullable":true,"type":"integer"}],"type":"struct"}'

schema = StructType.fromJson(json.loads(json_schema))

df = (
  spark.read.format("csv")
  .schema(schema)
  .option("header", True)
  .option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
  .load()
)

Spark 将不会启动任何作业,因为架构详细信息已在目录中可用。

正如@rodrigo 已经解释的那样,

csv 选项 inferSchema 暗示传递整个 csv 文件以推断模式。

您可以自己更改提供模式的行为(如果您想手动创建它,如果您使用的是 Scala,则可以使用案例 class)或使用 samplingRatio 选项指示您要扫描多少文件,以便在设置数据框时进行更快的操作。

文档中解释了所有有趣的行为,您可以在此处找到: Dataframe reader documentation with options for csv file reading

biglog_df = 
spark.read.format("csv")
.option("header",True)
.option("inferSchema",True)
   .option("samplingRatio", 0.01)
.option("path","bigLog.txt").load()

B问候