如何根据火花结构化流中的时间戳字段进行重复数据删除和保持最新?
How to deduplicate and keep latest based on timestamp field in spark structured streaming?
Spark dropDuplicates
保留第一个实例并忽略该键的所有后续事件。是否可以在保留最近出现的情况的同时删除重复项?
例如,如果下面是我得到的微批次,那么我想保留每个国家/地区的最新记录(按时间戳字段排序)。
batchId: 0
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
batchId: 1
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03
那么batchId 1之后的输出应该在-
下面
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06
Update-1
这是我目前的代码
//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.outputMode("update")
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
println("batchId: "+ batchId)
batchDF.show()
}
}.start()
我想输出所有行,这些行要么是新的,要么具有比迄今为止处理的先前批次中的任何记录更大的时间戳。下面的例子
在 batchId 之后:0 - 这两个国家都是第一次出现所以我应该把它们放在输出中
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
在 batchId 之后:1 - 白俄罗斯的时间戳比我们在批次 0 中收到的时间戳要早,所以我没有在输出中显示它。显示的澳大利亚是因为它的时间戳比我目前看到的要新。
Australia, 10, 2020-05-05 00:00:08
现在假设 batchId 2 出现了两条迟到记录,那么它不应该在该批次的输出中显示任何内容。
输入batchId: 2
Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01
batchId后:2
.
Update-2
为每个批次添加输入和预期记录。标有红色的行将被丢弃,并且不会在输出中显示为具有相同国家名称的另一行,并且在之前的批次中可以看到更新的时间戳
尝试在 spark streaming 中使用window
函数,例如检查下面。
val columns = Seq("country","id").map(col(_))
df.groupBy(window($"timestamp","10 minutes","5 minutes"), columns:_*)
您也可以检查相同的 ,解决方案在 python。
为了避免流式应用程序中的事件延迟到达,您需要在应用程序中保留一个状态,以跟踪每个键的最新处理事件,在您的情况下是 country
。
case class AppState(country:String, latestTs:java.sql.Timestamp)
对于一个微批次,你可能会收到多个事件,当你这样做时 groupByKey(_.country)
你会得到一个属于 key(country)
的事件,你需要将它与状态进行比较以找到最新输入事件并使用密钥的最新时间戳更新状态,并继续处理最新事件以进行进一步处理。对于迟到的事件,它应该return一个Option[Event]
并在后续过程中过滤掉。
请参阅此 blog 了解详细说明。
Spark dropDuplicates
保留第一个实例并忽略该键的所有后续事件。是否可以在保留最近出现的情况的同时删除重复项?
例如,如果下面是我得到的微批次,那么我想保留每个国家/地区的最新记录(按时间戳字段排序)。
batchId: 0
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
batchId: 1
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03
那么batchId 1之后的输出应该在-
下面Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06
Update-1 这是我目前的代码
//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.outputMode("update")
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
println("batchId: "+ batchId)
batchDF.show()
}
}.start()
我想输出所有行,这些行要么是新的,要么具有比迄今为止处理的先前批次中的任何记录更大的时间戳。下面的例子
在 batchId 之后:0 - 这两个国家都是第一次出现所以我应该把它们放在输出中
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
在 batchId 之后:1 - 白俄罗斯的时间戳比我们在批次 0 中收到的时间戳要早,所以我没有在输出中显示它。显示的澳大利亚是因为它的时间戳比我目前看到的要新。
Australia, 10, 2020-05-05 00:00:08
现在假设 batchId 2 出现了两条迟到记录,那么它不应该在该批次的输出中显示任何内容。
输入batchId: 2
Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01
batchId后:2
.
Update-2
为每个批次添加输入和预期记录。标有红色的行将被丢弃,并且不会在输出中显示为具有相同国家名称的另一行,并且在之前的批次中可以看到更新的时间戳
尝试在 spark streaming 中使用window
函数,例如检查下面。
val columns = Seq("country","id").map(col(_))
df.groupBy(window($"timestamp","10 minutes","5 minutes"), columns:_*)
您也可以检查相同的
为了避免流式应用程序中的事件延迟到达,您需要在应用程序中保留一个状态,以跟踪每个键的最新处理事件,在您的情况下是 country
。
case class AppState(country:String, latestTs:java.sql.Timestamp)
对于一个微批次,你可能会收到多个事件,当你这样做时 groupByKey(_.country)
你会得到一个属于 key(country)
的事件,你需要将它与状态进行比较以找到最新输入事件并使用密钥的最新时间戳更新状态,并继续处理最新事件以进行进一步处理。对于迟到的事件,它应该return一个Option[Event]
并在后续过程中过滤掉。
请参阅此 blog 了解详细说明。