Spark Streaming - 基于过滤器参数拆分输入流的最佳方式

Spark Streaming - Best way to Split Input Stream based on filter Param

我目前正在尝试创建某种监控解决方案 - 一些数据被写入 kafka,我使用 Spark Streaming 读取这些数据并进行处理。

为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数拆分流。到目前为止,我了解到 DStreams 本身不能拆分为多个流。

我主要面临的问题是许多算法(如 KMeans)仅采用连续数据而不是离散数据,例如url 或其他一些字符串。

我的理想要求是:

我很乐意得到任何关于如何解决我的问题的建议。我无法想象 Spark 中没有涵盖这种情况 - 但是直到现在我还没有发现可行的解决方案。

我认为使用过滤器和映射从原始 DStream 创建派生 DStream 就足够了:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))

请注意,这些 filtermap 步骤可以合并为一个 collect 步骤(不要与将数据传输到驱动程序的无参数 RDD.collect 相混淆!!!)

val featuresStream = originalDStream.transform(rdd => 
  rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)

我们还可以将一组动态的过滤后的 DStream 保存到某个集合中。这里我们使用一个映射,其中包含我们用来过滤的键:

originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String] 
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...

这些片段是要用实际逻辑完成的代码示例。