spark.read.load() 是动作还是转换?仅此声明就需要时间
Is spark.read.load() an action or transformation? It takes time with this statement alone
我尝试使用下面的代码加载数据,看起来,如果没有任何其他操作,这会花费很多时间。文件越大,时间越长
print("STARTED")
biglog_df = spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.option("path","bigLog.txt") \
.load()
print("DONE STARTING")
当文件大小为 4GB 时打印“DONE STARTING”大约需要 20 秒,而当文件大小为 25GB 时打印“DONE STARTING”需要一分多钟。这是否意味着 Spark 正在尝试加载数据?那么,加载是一个动作吗?
如果将 inferSchema
选项设置为 True
,则加载操作不会延迟计算。在这种情况下,spark 将启动一个作业来扫描文件并推断列的类型。
您可以通过在读取文件时通知架构来避免此行为。
您可以通过此测试观察此行为:
- 在 pyspark 中打开一个新的交互式会话;
- 打开 Spark UI > Pyspark 会话 > 作业
和运行:
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
.load()
)
您会注意到将启动作业以扫描(部分)文件以推断架构。
如果加载通知架构的文件:
import json
from pyspark.sql.types import StructType
json_schema = '{"fields":[{"metadata":{},"name":"zipcode","nullable":true,"type":"integer"},{"metadata":{},"name":"count_property","nullable":true,"type":"integer"},{"metadata":{},"name":"count_fema_sfha","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fema_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_fema_difference_2020","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_fema_difference_2020","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_all","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_2_10","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_no_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_floodfactor1","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor2","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor3","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor4","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor5","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor6","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor7","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor8","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor9","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor10","nullable":true,"type":"integer"}],"type":"struct"}'
schema = StructType.fromJson(json.loads(json_schema))
df = (
spark.read.format("csv")
.schema(schema)
.option("header", True)
.option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
.load()
)
Spark 将不会启动任何作业,因为架构详细信息已在目录中可用。
正如@rodrigo 已经解释的那样,
csv 选项 inferSchema
暗示传递整个 csv 文件以推断模式。
您可以自己更改提供模式的行为(如果您想手动创建它,如果您使用的是 Scala,则可以使用案例 class)或使用 samplingRatio
选项指示您要扫描多少文件,以便在设置数据框时进行更快的操作。
文档中解释了所有有趣的行为,您可以在此处找到:
Dataframe reader documentation with options for csv file reading
biglog_df =
spark.read.format("csv")
.option("header",True)
.option("inferSchema",True)
.option("samplingRatio", 0.01)
.option("path","bigLog.txt").load()
B问候
我尝试使用下面的代码加载数据,看起来,如果没有任何其他操作,这会花费很多时间。文件越大,时间越长
print("STARTED")
biglog_df = spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.option("path","bigLog.txt") \
.load()
print("DONE STARTING")
当文件大小为 4GB 时打印“DONE STARTING”大约需要 20 秒,而当文件大小为 25GB 时打印“DONE STARTING”需要一分多钟。这是否意味着 Spark 正在尝试加载数据?那么,加载是一个动作吗?
如果将 inferSchema
选项设置为 True
,则加载操作不会延迟计算。在这种情况下,spark 将启动一个作业来扫描文件并推断列的类型。
您可以通过在读取文件时通知架构来避免此行为。
您可以通过此测试观察此行为:
- 在 pyspark 中打开一个新的交互式会话;
- 打开 Spark UI > Pyspark 会话 > 作业
和运行:
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
.load()
)
您会注意到将启动作业以扫描(部分)文件以推断架构。
如果加载通知架构的文件:
import json
from pyspark.sql.types import StructType
json_schema = '{"fields":[{"metadata":{},"name":"zipcode","nullable":true,"type":"integer"},{"metadata":{},"name":"count_property","nullable":true,"type":"integer"},{"metadata":{},"name":"count_fema_sfha","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fema_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_fema_difference_2020","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_fema_difference_2020","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_all","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_2_10","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_no_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_floodfactor1","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor2","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor3","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor4","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor5","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor6","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor7","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor8","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor9","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor10","nullable":true,"type":"integer"}],"type":"struct"}'
schema = StructType.fromJson(json.loads(json_schema))
df = (
spark.read.format("csv")
.schema(schema)
.option("header", True)
.option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
.load()
)
Spark 将不会启动任何作业,因为架构详细信息已在目录中可用。
正如@rodrigo 已经解释的那样,
csv 选项 inferSchema
暗示传递整个 csv 文件以推断模式。
您可以自己更改提供模式的行为(如果您想手动创建它,如果您使用的是 Scala,则可以使用案例 class)或使用 samplingRatio
选项指示您要扫描多少文件,以便在设置数据框时进行更快的操作。
文档中解释了所有有趣的行为,您可以在此处找到: Dataframe reader documentation with options for csv file reading
biglog_df =
spark.read.format("csv")
.option("header",True)
.option("inferSchema",True)
.option("samplingRatio", 0.01)
.option("path","bigLog.txt").load()
B问候