Spark Streaming - 基于过滤器参数拆分输入流的最佳方式
Spark Streaming - Best way to Split Input Stream based on filter Param
我目前正在尝试创建某种监控解决方案 - 一些数据被写入 kafka,我使用 Spark Streaming 读取这些数据并进行处理。
为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数拆分流。到目前为止,我了解到 DStreams 本身不能拆分为多个流。
我主要面临的问题是许多算法(如 KMeans)仅采用连续数据而不是离散数据,例如url 或其他一些字符串。
我的理想要求是:
- 从kafka读取数据,根据读取的内容生成字符串列表
- 根据该字符串列表生成多个流 -(拆分流、过滤流或任何最佳实践)
- 使用这些流为每个流训练不同的模型以获得基线,然后将之后出现的所有内容与基线进行比较
我很乐意得到任何关于如何解决我的问题的建议。我无法想象 Spark 中没有涵盖这种情况 - 但是直到现在我还没有发现可行的解决方案。
我认为使用过滤器和映射从原始 DStream 创建派生 DStream 就足够了:
val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))
请注意,这些 filter
和 map
步骤可以合并为一个 collect
步骤(不要与将数据传输到驱动程序的无参数 RDD.collect 相混淆!!!)
val featuresStream = originalDStream.transform(rdd =>
rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)
我们还可以将一组动态的过滤后的 DStream 保存到某个集合中。这里我们使用一个映射,其中包含我们用来过滤的键:
originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String]
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...
这些片段是要用实际逻辑完成的代码示例。
我目前正在尝试创建某种监控解决方案 - 一些数据被写入 kafka,我使用 Spark Streaming 读取这些数据并进行处理。
为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数拆分流。到目前为止,我了解到 DStreams 本身不能拆分为多个流。
我主要面临的问题是许多算法(如 KMeans)仅采用连续数据而不是离散数据,例如url 或其他一些字符串。
我的理想要求是:
- 从kafka读取数据,根据读取的内容生成字符串列表
- 根据该字符串列表生成多个流 -(拆分流、过滤流或任何最佳实践)
- 使用这些流为每个流训练不同的模型以获得基线,然后将之后出现的所有内容与基线进行比较
我很乐意得到任何关于如何解决我的问题的建议。我无法想象 Spark 中没有涵盖这种情况 - 但是直到现在我还没有发现可行的解决方案。
我认为使用过滤器和映射从原始 DStream 创建派生 DStream 就足够了:
val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))
请注意,这些 filter
和 map
步骤可以合并为一个 collect
步骤(不要与将数据传输到驱动程序的无参数 RDD.collect 相混淆!!!)
val featuresStream = originalDStream.transform(rdd =>
rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)
我们还可以将一组动态的过滤后的 DStream 保存到某个集合中。这里我们使用一个映射,其中包含我们用来过滤的键:
originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String]
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...
这些片段是要用实际逻辑完成的代码示例。