当 ETL 作业被破坏时 Table 获取重复项 twice.ETL 作业从 RDS 获取数据到 S3 存储桶
Getting duplicates in the Table when an ETL job Is ruined twice.ETL job fetch data from RDS to S3 bucket
当 ETL 作业为 运行 时,它会正确执行,但由于 table 没有时间戳,当相同的 ETL 作业为 run.How 时,它会复制数据以执行暂存和解决使用 Upsert 或如果有任何其他问题欢迎您使用此问题 answer.How 我是否可以解决此问题我找到的解决方案是在其中包含时间戳或进行暂存还是有任何其他方式?
U 可以在向 s3 写入数据时使用 overwrite
。它将替换原始数据
为了防止 s3 上的重复,您需要从目标加载数据并在保存之前过滤掉现有记录:
val deltaDf = newDataDf.alias("new")
.join(existingDf.alias("existing"), "id", "left_outer")
.where(col("existing.id").isNull)
.select("new.*")
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
但是,此方法不会覆盖更新的记录。
另一种选择是使用一些 updated_at
字段来保存更新的记录,下游消费者可以使用这些字段来获取最新值。
您还可以考虑在每次 运行 工作时将数据集转储到单独的文件夹中(即,每天您在 data/dataset_date=<year-month-day>
中都有完整的数据转储)
import org.apache.spark.sql.functions._
val datedDf = sourceDf.withColumn("dataset_date", current_date())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path,
"partitionKeys" -> Array("dataset_date")
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(datedDf, glueContext))
当 ETL 作业为 运行 时,它会正确执行,但由于 table 没有时间戳,当相同的 ETL 作业为 run.How 时,它会复制数据以执行暂存和解决使用 Upsert 或如果有任何其他问题欢迎您使用此问题 answer.How 我是否可以解决此问题我找到的解决方案是在其中包含时间戳或进行暂存还是有任何其他方式?
U 可以在向 s3 写入数据时使用 overwrite
。它将替换原始数据
为了防止 s3 上的重复,您需要从目标加载数据并在保存之前过滤掉现有记录:
val deltaDf = newDataDf.alias("new")
.join(existingDf.alias("existing"), "id", "left_outer")
.where(col("existing.id").isNull)
.select("new.*")
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
但是,此方法不会覆盖更新的记录。
另一种选择是使用一些 updated_at
字段来保存更新的记录,下游消费者可以使用这些字段来获取最新值。
您还可以考虑在每次 运行 工作时将数据集转储到单独的文件夹中(即,每天您在 data/dataset_date=<year-month-day>
中都有完整的数据转储)
import org.apache.spark.sql.functions._
val datedDf = sourceDf.withColumn("dataset_date", current_date())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path,
"partitionKeys" -> Array("dataset_date")
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(datedDf, glueContext))