Spark streaming 的批量查找数据
Batch lookup data for Spark streaming
我需要从 HDFS 上的文件中查找 Spark 流作业中的一些数据
此数据由批处理作业每天提取一次。
是否有针对此类任务的“设计模式”?
- 如何在
之后立即重新加载内存中的数据(散列图)
每日更新?
- 如何在查找数据为
时持续提供流作业
正在提取?
一种可能的方法是放弃本地数据结构并改用有状态流。假设您有名为 mainStream
:
的主数据流
val mainStream: DStream[T] = ???
接下来您可以创建另一个读取查找数据的流:
val lookupStream: DStream[(K, V)] = ???
和一个可用于更新状态的简单函数
def update(
current: Seq[V], // A sequence of values for a given key in the current batch
prev: Option[V] // Value for a given key from in the previous state
): Option[V] = {
current
.headOption // If current batch is not empty take first element
.orElse(prev) // If it is empty (None) take previous state
}
这两块可以用来创建状态:
val state = lookup.updateStateByKey(update)
剩下的就是按键 mainStream
和连接数据:
def toPair(t: T): (K, T) = ???
mainStream.map(toPair).leftOuterJoin(state)
虽然从性能的角度来看,这可能不是最佳选择,但它利用了现有的体系结构,使您无需手动处理失效或故障恢复。
我需要从 HDFS 上的文件中查找 Spark 流作业中的一些数据
此数据由批处理作业每天提取一次。
是否有针对此类任务的“设计模式”?
- 如何在
之后立即重新加载内存中的数据(散列图) 每日更新? - 如何在查找数据为
时持续提供流作业 正在提取?
一种可能的方法是放弃本地数据结构并改用有状态流。假设您有名为 mainStream
:
val mainStream: DStream[T] = ???
接下来您可以创建另一个读取查找数据的流:
val lookupStream: DStream[(K, V)] = ???
和一个可用于更新状态的简单函数
def update(
current: Seq[V], // A sequence of values for a given key in the current batch
prev: Option[V] // Value for a given key from in the previous state
): Option[V] = {
current
.headOption // If current batch is not empty take first element
.orElse(prev) // If it is empty (None) take previous state
}
这两块可以用来创建状态:
val state = lookup.updateStateByKey(update)
剩下的就是按键 mainStream
和连接数据:
def toPair(t: T): (K, T) = ???
mainStream.map(toPair).leftOuterJoin(state)
虽然从性能的角度来看,这可能不是最佳选择,但它利用了现有的体系结构,使您无需手动处理失效或故障恢复。