Apache Spark/Azure Data Lake Storage - 仅处理一次文件,将文件标记为已处理
Apache Spark/Azure Data Lake Storage - Process the file exactly once, tag the file as processed
我有一个 Azure Data Lake Storage 容器,它充当 JSON 文件的着陆区以供 Apache Spark 处理。
那里有数以万计的小文件(最多几 MB)。 Spark 代码定期读取这些文件并执行一些转换。
我希望文件只被读取一次并且 Spark 脚本是幂等的。
如何确保文件不会被反复读取?我该如何有效地做到这一点?
我是这样读取数据的:
spark.read.json("/mnt/input_location/*.json")
我想到了以下方法:
- 使用已经处理过的文件名和 运行 输入 DataFrame 上的 EXCEPT 转换创建增量 table
- 将处理过的文件移动到不同的位置(或重命名)。我宁愿不那样做。如果我需要重新处理数据,我需要运行重新命名一次这个操作需要很长时间。
希望有更好的办法。请提出一些建议。
您可以使用启用了检查点的结构化流作业和 Trigger.Once
。
该作业的检查点文件将跟踪作业已使用的 JSON 文件。此外,Trigger.Once
触发器将使此流式处理作业如同批处理作业一样。
Databricks 有一篇很好的文章解释了“为什么 Streaming 和 RunOnce 比 Batch 更好”。
您的结构化流式传输作业可能如下所示:
val checkpointLocation = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_location/"
val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)
val query = streamDF
.[...] // apply your processing
.writeStream
.format("console") // change sink format accordingly
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.Once)
.start()
query.awaitTermination()
我有一个 Azure Data Lake Storage 容器,它充当 JSON 文件的着陆区以供 Apache Spark 处理。
那里有数以万计的小文件(最多几 MB)。 Spark 代码定期读取这些文件并执行一些转换。
我希望文件只被读取一次并且 Spark 脚本是幂等的。 如何确保文件不会被反复读取?我该如何有效地做到这一点?
我是这样读取数据的:
spark.read.json("/mnt/input_location/*.json")
我想到了以下方法:
- 使用已经处理过的文件名和 运行 输入 DataFrame 上的 EXCEPT 转换创建增量 table
- 将处理过的文件移动到不同的位置(或重命名)。我宁愿不那样做。如果我需要重新处理数据,我需要运行重新命名一次这个操作需要很长时间。
希望有更好的办法。请提出一些建议。
您可以使用启用了检查点的结构化流作业和 Trigger.Once
。
该作业的检查点文件将跟踪作业已使用的 JSON 文件。此外,Trigger.Once
触发器将使此流式处理作业如同批处理作业一样。
Databricks 有一篇很好的文章解释了“为什么 Streaming 和 RunOnce 比 Batch 更好”。
您的结构化流式传输作业可能如下所示:
val checkpointLocation = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_location/"
val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)
val query = streamDF
.[...] // apply your processing
.writeStream
.format("console") // change sink format accordingly
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.Once)
.start()
query.awaitTermination()