SPARK - 加入两个数据流 - 维护缓存
SPARK - Joining two data streams - maintenance of cache
很明显,Spark Streaming 中开箱即用的连接功能并不能保证很多现实生活中的用例。原因是它只加入微批 RDD 中包含的数据。
用例是连接来自两个 kafka 流的数据,并用它在 spark 中的 stream2 中对应的对象丰富 stream1 中的每个对象,并将其保存到 HBase。
实施会
在内存中维护来自 stream2 的对象的数据集,在收到对象时添加或替换对象
对于stream1中的每个元素,访问缓存以从stream2中找到匹配的对象,如果找到匹配则保存到HBase,否则将其放回kafka流。
这个问题是关于 Spark streaming 的探索,API 找到一种方法来实现上面提到的。
您可以将传入的 RDD
加入到其他 RDD
中——而不仅仅是那个微批次中的那些。基本上,您会保留一个 "running total" RDD
并填写如下内容:
var globalRDD1: RDD[...] = sc.emptyRDD
var globalRDD2: RDD[...] = sc.emptyRDD
dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd))
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) {
globalRDD2 = globalRDD2.union(rdd))
globalRDD1.join(globalRDD2).foreach(...) // etc, etc
}
一个好的开始是研究 mapWithState
。这是 updateStateByKey
更有效的替代品。这些是在 PairDStreamFunction
上定义的,因此假设您在 stream2 中类型为 V
的对象由类型为 K
的某个键标识,您的第一点将如下所示:
def stream2: DStream[(K, V)] = ???
def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = {
value.foreach(state.update(_))
(key, state.get())
}
val spec = StateSpec.function(maintainStream2Objects)
val stream2State = stream2.mapWithState(spec)
stream2State
现在是一个流,其中每个批次包含 (K, V)
对以及每个键的最新值。您可以在此流上进行连接,并 stream1
为您的第二点执行进一步的逻辑。
很明显,Spark Streaming 中开箱即用的连接功能并不能保证很多现实生活中的用例。原因是它只加入微批 RDD 中包含的数据。
用例是连接来自两个 kafka 流的数据,并用它在 spark 中的 stream2 中对应的对象丰富 stream1 中的每个对象,并将其保存到 HBase。
实施会
在内存中维护来自 stream2 的对象的数据集,在收到对象时添加或替换对象
对于stream1中的每个元素,访问缓存以从stream2中找到匹配的对象,如果找到匹配则保存到HBase,否则将其放回kafka流。
这个问题是关于 Spark streaming 的探索,API 找到一种方法来实现上面提到的。
您可以将传入的 RDD
加入到其他 RDD
中——而不仅仅是那个微批次中的那些。基本上,您会保留一个 "running total" RDD
并填写如下内容:
var globalRDD1: RDD[...] = sc.emptyRDD
var globalRDD2: RDD[...] = sc.emptyRDD
dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd))
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) {
globalRDD2 = globalRDD2.union(rdd))
globalRDD1.join(globalRDD2).foreach(...) // etc, etc
}
一个好的开始是研究 mapWithState
。这是 updateStateByKey
更有效的替代品。这些是在 PairDStreamFunction
上定义的,因此假设您在 stream2 中类型为 V
的对象由类型为 K
的某个键标识,您的第一点将如下所示:
def stream2: DStream[(K, V)] = ???
def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = {
value.foreach(state.update(_))
(key, state.get())
}
val spec = StateSpec.function(maintainStream2Objects)
val stream2State = stream2.mapWithState(spec)
stream2State
现在是一个流,其中每个批次包含 (K, V)
对以及每个键的最新值。您可以在此流上进行连接,并 stream1
为您的第二点执行进一步的逻辑。