SPARK - 加入两个数据流 - 维护缓存

SPARK - Joining two data streams - maintenance of cache

很明显,Spark Streaming 中开箱即用的连接功能并不能保证很多现实生活中的用例。原因是它只加入微批​​ RDD 中包含的数据。

用例是连接来自两个 kafka 流的数据,并用它在 spark 中的 stream2 中对应的对象丰富 stream1 中的每个对象,并将其保存到 HBase。

实施会

这个问题是关于 Spark streaming 的探索,API 找到一种方法来实现上面提到的。

您可以将传入的 RDD 加入到其他 RDD 中——而不仅仅是那个微批次中的那些。基本上,您会保留一个 "running total" RDD 并填写如下内容:

var globalRDD1: RDD[...] = sc.emptyRDD
var globalRDD2: RDD[...] = sc.emptyRDD

dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd))
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) {
  globalRDD2 = globalRDD2.union(rdd))
  globalRDD1.join(globalRDD2).foreach(...) // etc, etc
}

一个好的开始是研究 mapWithState。这是 updateStateByKey 更有效的替代品。这些是在 PairDStreamFunction 上定义的,因此假设您在 stream2 中类型为 V 的对象由类型为 K 的某个键标识,您的第一点将如下所示:

def stream2: DStream[(K, V)] = ???

def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = {
  value.foreach(state.update(_))
  (key, state.get())
}

val spec = StateSpec.function(maintainStream2Objects)

val stream2State = stream2.mapWithState(spec)

stream2State 现在是一个流,其中每个批次包含 (K, V) 对以及每个键的最新值。您可以在此流上进行连接,并 stream1 为您的第二点执行进一步的逻辑。