Delta Lake 将多个文件压缩为单个文件
Delta Lake Compacting Multiple files to single file
我目前正在探索由 databricks 开源的 delta lake。我正在读取 kafka 数据并使用 delta lake 格式作为流写入。 Delta lake 在从 kafka 进行流式写入期间创建了许多文件,我觉得它是 hdfs 文件系统。
我试过将多个文件压缩成一个文件。
val spark = SparkSession.builder
.master("local")
.appName("spark session example")
.getOrCreate()
val df = spark.read.parquet("deltalakefile/data/")
df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/")
df.show()
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
DeltaTable.forPath("deltalakefile/data/").vacuum(1)
但是当我检查输出时它正在创建新文件而不是删除任何现有文件。
有没有办法实现这个。还有这里保留期的关系是什么?我们在使用的时候在HDFS中应该如何配置呢?当我想构建具有 delta lake 格式的 raw/bronze 层并且我想长期保留我的所有数据(云上 premises/infinite 年)时,我的保留配置应该是什么?
根据设计,Delta 不会立即删除文件以防止活跃消费者受到影响。它还提供版本控制(又名时间旅行),因此您可以在必要时查看历史记录。要删除以前的版本或未提交的文件,您需要 运行 vacuum.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // use default retention period
关于如何管理 bronze/silver/gold 模型的保留和压缩的问题,您应该将登陆 table(又名青铜)视为仅附加日志。这意味着您不需要在事后执行压缩或任何重写。青铜 table 应该是您从上游数据源(例如 Kafka)提取的数据的记录,应用了最少的处理。
青铜 table 通常用作增量流源来填充下游数据集。鉴于从 Delta 读取是从事务日志中完成的,与使用执行慢速文件列表的标准文件读取器相比,小文件不是问题。
但是,在将文件写入青铜器时,仍有一些选项可以优化文件 table:1) 在写入 Delta 时压缩 Kafka 消息,方法是首先重新分区以减少文件,2) 增加您的触发间隔,因此摄取 运行 的频率降低,并且将更多消息写入更大的文件。
我目前正在探索由 databricks 开源的 delta lake。我正在读取 kafka 数据并使用 delta lake 格式作为流写入。 Delta lake 在从 kafka 进行流式写入期间创建了许多文件,我觉得它是 hdfs 文件系统。
我试过将多个文件压缩成一个文件。
val spark = SparkSession.builder
.master("local")
.appName("spark session example")
.getOrCreate()
val df = spark.read.parquet("deltalakefile/data/")
df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/")
df.show()
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
DeltaTable.forPath("deltalakefile/data/").vacuum(1)
但是当我检查输出时它正在创建新文件而不是删除任何现有文件。
有没有办法实现这个。还有这里保留期的关系是什么?我们在使用的时候在HDFS中应该如何配置呢?当我想构建具有 delta lake 格式的 raw/bronze 层并且我想长期保留我的所有数据(云上 premises/infinite 年)时,我的保留配置应该是什么?
根据设计,Delta 不会立即删除文件以防止活跃消费者受到影响。它还提供版本控制(又名时间旅行),因此您可以在必要时查看历史记录。要删除以前的版本或未提交的文件,您需要 运行 vacuum.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // use default retention period
关于如何管理 bronze/silver/gold 模型的保留和压缩的问题,您应该将登陆 table(又名青铜)视为仅附加日志。这意味着您不需要在事后执行压缩或任何重写。青铜 table 应该是您从上游数据源(例如 Kafka)提取的数据的记录,应用了最少的处理。
青铜 table 通常用作增量流源来填充下游数据集。鉴于从 Delta 读取是从事务日志中完成的,与使用执行慢速文件列表的标准文件读取器相比,小文件不是问题。
但是,在将文件写入青铜器时,仍有一些选项可以优化文件 table:1) 在写入 Delta 时压缩 Kafka 消息,方法是首先重新分区以减少文件,2) 增加您的触发间隔,因此摄取 运行 的频率降低,并且将更多消息写入更大的文件。