将具有多个相同密钥的流写入到 delta lake
Stream writes having multiple identical keys to delta lake
我正在通过 spark 结构化流将流写入 delta lake。每个流式批次都包含键值(还包含时间戳作为一列)。 delta lake 不支持在源(蒸批处理)上使用多个相同的密钥进行更新,所以我想更新 delta lake,仅使用具有最新时间戳的记录。我该怎么做?
这是我正在尝试的代码片段:
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
println(s"Executing batch $batchId ...")
microBatchOutputDF.show()
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
提前致谢。
您可以从 "microBatchOutputDF" 数据框中删除具有较早时间戳的记录,并仅保留具有给定键的最新时间戳的记录。
您可以使用 spark 的 'reduceByKey' 操作并实现自定义 reduce 函数,如下所示。
def getLatestEvents(input: DataFrame) : RDD[Row] = {
input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }
def reduceFun(x: Row, y: Row) : Row = {
if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }
假定键是字符串类型,时间戳是时间戳类型。并为您的流式处理批次 'microBatchOutputDF' 调用 "getLatestEvents"。它忽略较旧的时间戳事件并仅保留最新的时间戳事件。
val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)
然后在'latestRecordsDF'
之上调用deltalake合并操作
在微批处理的流式处理中,对于给定的键,您可能会获得多个记录。为了用目标 table 更新它,您必须找出微批中键的最新记录。在您的情况下,您可以使用时间戳列的最大值和值列来查找最新记录并将该记录用于合并操作。
您可以参考此 link 以了解有关查找给定键的最新记录的更多详细信息。
我正在通过 spark 结构化流将流写入 delta lake。每个流式批次都包含键值(还包含时间戳作为一列)。 delta lake 不支持在源(蒸批处理)上使用多个相同的密钥进行更新,所以我想更新 delta lake,仅使用具有最新时间戳的记录。我该怎么做?
这是我正在尝试的代码片段:
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
println(s"Executing batch $batchId ...")
microBatchOutputDF.show()
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
提前致谢。
您可以从 "microBatchOutputDF" 数据框中删除具有较早时间戳的记录,并仅保留具有给定键的最新时间戳的记录。
您可以使用 spark 的 'reduceByKey' 操作并实现自定义 reduce 函数,如下所示。
def getLatestEvents(input: DataFrame) : RDD[Row] = {
input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }
def reduceFun(x: Row, y: Row) : Row = {
if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }
假定键是字符串类型,时间戳是时间戳类型。并为您的流式处理批次 'microBatchOutputDF' 调用 "getLatestEvents"。它忽略较旧的时间戳事件并仅保留最新的时间戳事件。
val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)
然后在'latestRecordsDF'
之上调用deltalake合并操作在微批处理的流式处理中,对于给定的键,您可能会获得多个记录。为了用目标 table 更新它,您必须找出微批中键的最新记录。在您的情况下,您可以使用时间戳列的最大值和值列来查找最新记录并将该记录用于合并操作。
您可以参考此 link 以了解有关查找给定键的最新记录的更多详细信息。