Spark 将一个 DStream 拆分为多个 RDD
Spark splitting a DStream into several RDDs
同样的问题也适用于将一个RDD拆分成几个新的RDD。
一个 DStream 或 RDD 包含几个不同的 case classes,我需要根据 case class 类型将它们变成单独的 RDD。
我知道
val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
或
val newRDD = rdd.filter {
a => a match {
case _: CC1 => true
case _ => false
}
}
但这需要对原始 RDD 进行多次运行,每种情况运行一次 class 类型。
- 上面的匹配过滤一定有更简洁的方法吗?
- 有没有办法通过一次并行传递将一个 rdd 按元素类型分成几个?
1) 一种更简洁的过滤给定类型的方法是使用 rdd.collect(PartialFunction[T,U])
相当于
val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
将是:
val newRDD = rdd.collect{case c:CaseClass1 => c}
它甚至可以与额外的过滤和转换相结合:
val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget}
rdd.collect(p:PartialFunction[T,U])
不应与 rdd.collect()
混淆,后者将数据返回给驱动程序。
2) 要拆分 RDD(或 DStream),filter
是可行的方法。必须记住,RDD 是一种分布式集合。 Filter 将允许您在集群上并行地将函数应用于该分布式集合的子集。
从原始 RDD 创建 2 个或更多 RDD 的结构将导致 1 对多洗牌阶段,这将大大增加成本。
看起来 rdd.filter
我在长格式的正确轨道上。稍微简洁一点的版本是:
val newRDD = rdd.filter { case _: CC1 => true ; case _ => false }
您不能遗漏 case _ => false
,否则 class 的测试并不详尽,您会得到错误。我无法让收集正常工作。
@maasg 因关于执行单独的过滤器传递而不是破解一种在一次传递中拆分输入的方法的正确答案而获得赞誉。
同样的问题也适用于将一个RDD拆分成几个新的RDD。
一个 DStream 或 RDD 包含几个不同的 case classes,我需要根据 case class 类型将它们变成单独的 RDD。
我知道
val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
或
val newRDD = rdd.filter {
a => a match {
case _: CC1 => true
case _ => false
}
}
但这需要对原始 RDD 进行多次运行,每种情况运行一次 class 类型。
- 上面的匹配过滤一定有更简洁的方法吗?
- 有没有办法通过一次并行传递将一个 rdd 按元素类型分成几个?
1) 一种更简洁的过滤给定类型的方法是使用 rdd.collect(PartialFunction[T,U])
相当于
val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
将是:
val newRDD = rdd.collect{case c:CaseClass1 => c}
它甚至可以与额外的过滤和转换相结合:
val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget}
rdd.collect(p:PartialFunction[T,U])
不应与 rdd.collect()
混淆,后者将数据返回给驱动程序。
2) 要拆分 RDD(或 DStream),filter
是可行的方法。必须记住,RDD 是一种分布式集合。 Filter 将允许您在集群上并行地将函数应用于该分布式集合的子集。
从原始 RDD 创建 2 个或更多 RDD 的结构将导致 1 对多洗牌阶段,这将大大增加成本。
看起来 rdd.filter
我在长格式的正确轨道上。稍微简洁一点的版本是:
val newRDD = rdd.filter { case _: CC1 => true ; case _ => false }
您不能遗漏 case _ => false
,否则 class 的测试并不详尽,您会得到错误。我无法让收集正常工作。
@maasg 因关于执行单独的过滤器传递而不是破解一种在一次传递中拆分输入的方法的正确答案而获得赞誉。