如何在给定匹配器的情况下拆分 DStream
How to split a DStream given a matcher
给出
val dstream = ssc.createStream(..)
我们如何从中获取 bucketed/grouped/split 组 Dstreams,如下所示:
val (s1, s2, s3): (DStream[_],DStream[_],DStream[_]) =
dstream.map{ in match =>
case <cond1> => bucket1Value
case <cond2> => bucket2Value
case _ => bucket3Value
}.<some bucketing/grouping operation>
RE: 可能重复那是一个完全不同的问题 - 另一个是关于RDD 不是 DStream 的!
回答我自己的问题:但是如果任何人(任何人?)有直接执行操作的建议,我们将很乐意接受。
所以这里是一个解决方案——虽然不优雅。
val s1 = dstream.flatMap{ in match =>
case r if <cond1> => bucket1Value
case _ => None
}
val s2 = dstream.flatMap{ in match =>
case r if <cond2> => bucket1Value
case _ => None
}
val s3 = dstream.flatMap{ in match =>
case r if !<cond1> && !<cond2> => bucket3Value
case _ => None
}
给出
val dstream = ssc.createStream(..)
我们如何从中获取 bucketed/grouped/split 组 Dstreams,如下所示:
val (s1, s2, s3): (DStream[_],DStream[_],DStream[_]) =
dstream.map{ in match =>
case <cond1> => bucket1Value
case <cond2> => bucket2Value
case _ => bucket3Value
}.<some bucketing/grouping operation>
RE: 可能重复那是一个完全不同的问题 - 另一个是关于RDD 不是 DStream 的!
回答我自己的问题:但是如果任何人(任何人?)有直接执行操作的建议,我们将很乐意接受。
所以这里是一个解决方案——虽然不优雅。
val s1 = dstream.flatMap{ in match =>
case r if <cond1> => bucket1Value
case _ => None
}
val s2 = dstream.flatMap{ in match =>
case r if <cond2> => bucket1Value
case _ => None
}
val s3 = dstream.flatMap{ in match =>
case r if !<cond1> && !<cond2> => bucket3Value
case _ => None
}