如何在给定匹配器的情况下拆分 DStream

How to split a DStream given a matcher

给出

val dstream = ssc.createStream(..)

我们如何从中获取 bucketed/grouped/split 组 Dstreams,如下所示:

val (s1, s2, s3): (DStream[_],DStream[_],DStream[_]) = 
   dstream.map{ in match =>
     case <cond1> => bucket1Value
     case <cond2> => bucket2Value
     case _ => bucket3Value
  }.<some bucketing/grouping operation>

RE: 可能重复那是一个完全不同的问题 - 另一个是关于RDD 不是 DStream 的!

回答我自己的问题:但是如果任何人(任何人?)有直接执行操作的建议,我们将很乐意接受。

所以这里是一个解决方案——虽然不优雅。

val s1 = dstream.flatMap{ in match =>
     case r if <cond1> => bucket1Value
     case _            => None
}
val s2 = dstream.flatMap{ in match =>
     case r if <cond2> => bucket1Value
     case _            => None
}
val s3 = dstream.flatMap{ in match =>
     case r if !<cond1> && !<cond2> => bucket3Value
     case _            => None
}