在写入 Snowflake 之前保存 Spark Dataframe

Save Spark Dataframe Before Writing to Snowflake

我在 PySpark 中工作,在获得我正在写入 Snowflake 的最终输出 table 之前,我进行了一系列转换并应用了用户定义的函数。写入 Snowflake 的最终命令需要大约 25 分钟到 运行,因为它也在执行所有计算,因为 Spark 惰性评估并且直到最后一次调用才评估。 我想在之前的步骤中对最终的 table 进行评估,这样我就可以计算所有转换需要多长时间,然后分别计算写入 Snowflake 步骤需要多长时间。我如何将两者分开?我尝试过:

temp = final_df.show() 

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

但是我得到错误:

'NoneType' object has no attribute 'write'

并使用 collect()

temp = final_df.collect() 

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

但是我得到错误:

'list' object has no attribute 'write'

你的 temp 数据框有 .show() 的结果,结果是临时变量的 none 类型,只有 dataframe.write 来源方法。

Try with below code:

temp = final_df
#view records from temp dataframe
temp.show()

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

#collect collects the data as list and stores into temp variable
temp = final_df.collect() 

#list attributes doesn't have .write method
final_df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

Update:

import time
start_time = time.time()
#code until show()
temp = final_df
#view records from temp dataframe
temp.show()
end_time = time.time()
print("Total execution time for action: {} seconds".format(end_time - start_time))

start_time_sfw = time.time()
#code until show()
final_df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()
end_time_sfw = time.time()
print("Total execution time for writing to snowflake: {} seconds".format(end_time_sfw - start_time_sfw))