spark streaming 为每个触发过程间隔的每条记录选择最新事件
spark streaming pick latest event for every record per trigger process interval
我们有一个 spark streaming(spark 版本 2.4.0)作业,它使用一个 Kafka 主题(4 个分区),其中包括业务更改 json 和 Id。
这些 Kafka 值还包括 RecordTime 字段和 json 对象内的其他字段。
此流作业根据 Id 字段更新 Kudu table。
一段时间后,我们注意到,某些更新实际上并未反映某些 id 字段值的最新状态。
我们假设每个分区有 4 个不同的执行程序处理,并且当其中一个比其他执行程序更早完成时,它会更新目标 Kudu table。
所以如果我们有如下值:
(Id=1, val=A, RecordTime: 10:00:05 ) partition1
(Id=2, val=A, RecordTime: 10:00:04 ) partition1
(Id=1, val=B, RecordTime: 10:00:07 ) partition2
(Id=1, val=C, RecordTime: 10:00:06 ) partition3
(Id=2, val=D, RecordTime: 10:00:05 ) partition1
(Id=2, val=C, RecordTime: 10:00:06 ) partition4
(Id=1, val=E, RecordTime: 10:00:03 ) partition4
那么Kudu table应该是这样的:
Id
Value
RecordTime
1
B
10:00:07
2
C
10:00:06
但是,有时我们会看到这样的 Kudu table :
Id
Value
RecordTime
1
A
10:00:05
2
C
10:00:06
触发间隔为 1 分钟。
那么,如何实现目标Kudu的有序更新呢table。
- 我们是否应该使用单个分区进行排序,但如果我们这样做 pros/cons?
- 对于 Spark Streaming,我们如何选择每个触发间隔的最新记录和值
- 根据 id 和 RecordTime 更新 kudu table 但是如何?
- 我们还有什么其他的方法可以考虑吗?
希望我能充分解释我的问题。
简而言之,我们如何在 Spark Streaming 中实现每个微批间隔的事件排序?
特别感谢任何能帮助我的人。
当您从 Kafka 获取数据时,记住 Kafka 仅在主题 partition.
内提供排序保证是很有用的
因此,如果您让 Kafka 生产者将同一 ID 的所有消息生成到同一分区中,则可以解决您的问题。这可以通过 KafkaProducer 中的自定义分区器来实现,或者如果您只是使用 id 的值作为 Kafka 消息的“键”部分。
如果您无法控制 Kafka 生产者,则需要让您的 Spark Streaming 作业有状态。在这里,具有挑战性的部分是定义一个时间范围,您的作业应该等待具有相同 ID 的其他消息到达多长时间。只是几秒钟吗?也许几个小时?我的经验是这可能很难回答,有时答案是“几个小时”,这意味着您需要将状态保持几个小时,这可能会使您的工作内存不足。
我们有一个 spark streaming(spark 版本 2.4.0)作业,它使用一个 Kafka 主题(4 个分区),其中包括业务更改 json 和 Id。 这些 Kafka 值还包括 RecordTime 字段和 json 对象内的其他字段。 此流作业根据 Id 字段更新 Kudu table。
一段时间后,我们注意到,某些更新实际上并未反映某些 id 字段值的最新状态。 我们假设每个分区有 4 个不同的执行程序处理,并且当其中一个比其他执行程序更早完成时,它会更新目标 Kudu table。 所以如果我们有如下值:
(Id=1, val=A, RecordTime: 10:00:05 ) partition1
(Id=2, val=A, RecordTime: 10:00:04 ) partition1
(Id=1, val=B, RecordTime: 10:00:07 ) partition2
(Id=1, val=C, RecordTime: 10:00:06 ) partition3
(Id=2, val=D, RecordTime: 10:00:05 ) partition1
(Id=2, val=C, RecordTime: 10:00:06 ) partition4
(Id=1, val=E, RecordTime: 10:00:03 ) partition4
那么Kudu table应该是这样的:
Id | Value | RecordTime |
---|---|---|
1 | B | 10:00:07 |
2 | C | 10:00:06 |
但是,有时我们会看到这样的 Kudu table :
Id | Value | RecordTime |
---|---|---|
1 | A | 10:00:05 |
2 | C | 10:00:06 |
触发间隔为 1 分钟。
那么,如何实现目标Kudu的有序更新呢table。
- 我们是否应该使用单个分区进行排序,但如果我们这样做 pros/cons?
- 对于 Spark Streaming,我们如何选择每个触发间隔的最新记录和值
- 根据 id 和 RecordTime 更新 kudu table 但是如何?
- 我们还有什么其他的方法可以考虑吗?
希望我能充分解释我的问题。 简而言之,我们如何在 Spark Streaming 中实现每个微批间隔的事件排序?
特别感谢任何能帮助我的人。
当您从 Kafka 获取数据时,记住 Kafka 仅在主题 partition.
内提供排序保证是很有用的因此,如果您让 Kafka 生产者将同一 ID 的所有消息生成到同一分区中,则可以解决您的问题。这可以通过 KafkaProducer 中的自定义分区器来实现,或者如果您只是使用 id 的值作为 Kafka 消息的“键”部分。
如果您无法控制 Kafka 生产者,则需要让您的 Spark Streaming 作业有状态。在这里,具有挑战性的部分是定义一个时间范围,您的作业应该等待具有相同 ID 的其他消息到达多长时间。只是几秒钟吗?也许几个小时?我的经验是这可能很难回答,有时答案是“几个小时”,这意味着您需要将状态保持几个小时,这可能会使您的工作内存不足。