通过 pyspark 胶水作业写入镶木地板文件时,s3 存储桶被删除
s3 bucket is getting deleted while writing parquet files via pyspark glue job
我开发了一个 Pyspark Glue
作业来加载 complete/incremental
数据集。它工作正常。加载数据集后,我必须执行一些 aggregations
并将其以 "overwrite"/"append"
模式写入单个位置。为此,我编写了以下代码:
maxDateValuePath = "s3://...../maxValue/"
outputPath = "s3://..../complete-load/"
aggregatedPath = "s3://...../aggregated-output/"
fullLoad = ""
aggregatedView = ""
completeAggregatedPath = "s3://...../aggregated-output/step=complete-load/"
incrAggregatedPath = "s3://....../aggregated-output/step=incremental-load/"
aggregatedView=""
data.createOrReplaceTempView("data")
aggregatedView = spark.sql("""
select catid,count(*) as number_of_catids from data
group by catid""")
if (incrementalLoad == str(0)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
aggregatedView.write.mode("overwrite").parquet(completeAggregatedPath)
elif (incrementalLoad == str(1)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
log.info("step 123: " + str(aggregatedView.count()))
aggregatedView.write.mode("append").parquet(completeAggregatedPath)
aggregatedView = spark.read.parquet(completeAggregatedPath)
log.info("step 126: " + str(aggregatedView.count()))
w = Window.partitionBy("catid").orderBy(col("created_at").desc())
aggregatedView = aggregatedView.withColumn("rw", row_number().over(w)).filter(col("rw") == lit(1)).drop(
"rw")
log.info("step 130: " + str(aggregatedView.count()))
log.info(aggregatedView.orderBy(col("created_at").desc()).show())
print("::::::::::::before writing::::::::::::::")
aggregatedView.write.mode("overwrite").parquet(incrAggregatedPath)
其中 0
和 1
代表完全 load/incremental 负载。现在,在写入转换后的数据集之前,我添加了一个 created_at
列,用于在写入增量数据集后处理最新的聚合记录,否则会导致重复。
一切都按预期工作正常,但问题是当我尝试使用增量部分的这一行 aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
以覆盖模式写入数据集时,存储桶在 s3 中被删除并且此操作导致下面 error
:
Caused by: java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
为什么存储桶被删除?
问题出在您的代码中。
您正在读取和写入增量部分中的相同位置。
aggregatedView = spark.read.parquet(aggregatedPath)
...
...
aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
由于 spark 进行了惰性评估,因此当您将模式指定为覆盖时,它会清除特定文件夹中的数据,使您无法阅读任何内容。当它到达代码的写入部分时,它开始读取数据,此时您的数据已经被您的写入操作清除。
因此,我通过更改代码的以下行解决了我的问题:
aggregatedView2 = spark.read.parquet(completeAggregatedPath)
因此对于聚合视图,将有一个 df 沿袭。
由于读取和写入是在相同的 s3 位置和相同的 df 沿袭上执行的,因此它正在删除
前缀,因为 df 的源数据不明确。
因此,创建了一个新的 df,它将在其中查找 S3 位置而不是之前的转换。
也许它会对某人有所帮助!
我开发了一个 Pyspark Glue
作业来加载 complete/incremental
数据集。它工作正常。加载数据集后,我必须执行一些 aggregations
并将其以 "overwrite"/"append"
模式写入单个位置。为此,我编写了以下代码:
maxDateValuePath = "s3://...../maxValue/"
outputPath = "s3://..../complete-load/"
aggregatedPath = "s3://...../aggregated-output/"
fullLoad = ""
aggregatedView = ""
completeAggregatedPath = "s3://...../aggregated-output/step=complete-load/"
incrAggregatedPath = "s3://....../aggregated-output/step=incremental-load/"
aggregatedView=""
data.createOrReplaceTempView("data")
aggregatedView = spark.sql("""
select catid,count(*) as number_of_catids from data
group by catid""")
if (incrementalLoad == str(0)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
aggregatedView.write.mode("overwrite").parquet(completeAggregatedPath)
elif (incrementalLoad == str(1)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
log.info("step 123: " + str(aggregatedView.count()))
aggregatedView.write.mode("append").parquet(completeAggregatedPath)
aggregatedView = spark.read.parquet(completeAggregatedPath)
log.info("step 126: " + str(aggregatedView.count()))
w = Window.partitionBy("catid").orderBy(col("created_at").desc())
aggregatedView = aggregatedView.withColumn("rw", row_number().over(w)).filter(col("rw") == lit(1)).drop(
"rw")
log.info("step 130: " + str(aggregatedView.count()))
log.info(aggregatedView.orderBy(col("created_at").desc()).show())
print("::::::::::::before writing::::::::::::::")
aggregatedView.write.mode("overwrite").parquet(incrAggregatedPath)
其中 0
和 1
代表完全 load/incremental 负载。现在,在写入转换后的数据集之前,我添加了一个 created_at
列,用于在写入增量数据集后处理最新的聚合记录,否则会导致重复。
一切都按预期工作正常,但问题是当我尝试使用增量部分的这一行 aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
以覆盖模式写入数据集时,存储桶在 s3 中被删除并且此操作导致下面 error
:
Caused by: java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
为什么存储桶被删除?
问题出在您的代码中。 您正在读取和写入增量部分中的相同位置。
aggregatedView = spark.read.parquet(aggregatedPath)
...
...
aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
由于 spark 进行了惰性评估,因此当您将模式指定为覆盖时,它会清除特定文件夹中的数据,使您无法阅读任何内容。当它到达代码的写入部分时,它开始读取数据,此时您的数据已经被您的写入操作清除。
因此,我通过更改代码的以下行解决了我的问题:
aggregatedView2 = spark.read.parquet(completeAggregatedPath)
因此对于聚合视图,将有一个 df 沿袭。 由于读取和写入是在相同的 s3 位置和相同的 df 沿袭上执行的,因此它正在删除 前缀,因为 df 的源数据不明确。 因此,创建了一个新的 df,它将在其中查找 S3 位置而不是之前的转换。
也许它会对某人有所帮助!