分组数据的Spark并行处理
Spark parallel processing of grouped data
最初,我有很多数据。但是使用 spark-SQL 尤其是 groupBy 可以将其缩小到可管理的大小。 (适合单个节点的 RAM)
我如何在所有组(分布在我的节点中)上执行功能(并行)?
如何确保单个组的数据收集到单个节点?例如。我可能想使用 local matrix
进行计算,但不想 运行 出现有关数据局部性的错误。
假设你有 x 没有。执行者的数量(在您的情况下,每个节点可能有 1 个执行者)。并且您希望以这样的方式对键上的数据进行分区,使每个键都落入一个唯一的存储桶中,这就像一个完美的 partitioner.There 不会这样做的通用方法,但如果有一些固有的 distribution/logic 特定于您的数据,则有可能实现这一点。
我处理过一个特定案例,我发现 Spark 的内置哈希分区器在分发密钥方面做得不好 uniformly.So 我使用 Guava 编写了一个自定义分区器,如下所示:
class FooPartitioner(partitions: Int) extends org.apache.spark.HashPartitioner(partitions: Int) {
override def getPartition(key: Any): Int = {
val hasherer = Hashing.murmur3_32().newHasher()
Hashing.consistentHash(
key match {
case i: Int => hasherer.putInt(i).hash.asInt()
case _ => key.hashCode
},PARTITION_SIZE)
}
}
然后我将这个分区程序实例作为参数添加到我正在使用的 combineBy 中,以便生成的 rdd 以这种方式进行分区。
这很好地将数据分发到 x 个桶,但我想不能保证每个桶只有 1 个键。
如果您使用的是 Spark 1.6 并使用数据帧,您可以像这样定义一个 udf
val hasher = udf((i:Int)=>Hashing.consistentHash(Hashing.murmur3_32().newHasher().putInt(i) .hash.asInt(),PARTITION_SIZE))
并做 dataframe.repartition(hasher(keyThatYouAreUsing))
希望这能提供一些入门提示。
我从 Efficient UD(A)Fs with PySpark 找到了解决方案
这个博客
- mapPartitions 分割数据;
- udaf 将 spark 数据帧转换为 pandas 数据帧;
- 在 udaf 和 return 一个 pandas 数据帧中做你的数据 etl 逻辑;
- udaf 会将 pandas 数据帧转换为 spark 数据帧;
- toDF() 合并结果 spark 数据帧并像 SaveAsTable 一样做一些持久化;
df = df.repartition('guestid').rdd.mapPartitions(udf_calc).toDF()
最初,我有很多数据。但是使用 spark-SQL 尤其是 groupBy 可以将其缩小到可管理的大小。 (适合单个节点的 RAM)
我如何在所有组(分布在我的节点中)上执行功能(并行)?
如何确保单个组的数据收集到单个节点?例如。我可能想使用 local matrix
进行计算,但不想 运行 出现有关数据局部性的错误。
假设你有 x 没有。执行者的数量(在您的情况下,每个节点可能有 1 个执行者)。并且您希望以这样的方式对键上的数据进行分区,使每个键都落入一个唯一的存储桶中,这就像一个完美的 partitioner.There 不会这样做的通用方法,但如果有一些固有的 distribution/logic 特定于您的数据,则有可能实现这一点。
我处理过一个特定案例,我发现 Spark 的内置哈希分区器在分发密钥方面做得不好 uniformly.So 我使用 Guava 编写了一个自定义分区器,如下所示: class FooPartitioner(partitions: Int) extends org.apache.spark.HashPartitioner(partitions: Int) {
override def getPartition(key: Any): Int = {
val hasherer = Hashing.murmur3_32().newHasher()
Hashing.consistentHash(
key match {
case i: Int => hasherer.putInt(i).hash.asInt()
case _ => key.hashCode
},PARTITION_SIZE)
}
}
然后我将这个分区程序实例作为参数添加到我正在使用的 combineBy 中,以便生成的 rdd 以这种方式进行分区。 这很好地将数据分发到 x 个桶,但我想不能保证每个桶只有 1 个键。
如果您使用的是 Spark 1.6 并使用数据帧,您可以像这样定义一个 udf
val hasher = udf((i:Int)=>Hashing.consistentHash(Hashing.murmur3_32().newHasher().putInt(i) .hash.asInt(),PARTITION_SIZE))
并做 dataframe.repartition(hasher(keyThatYouAreUsing))
希望这能提供一些入门提示。
我从 Efficient UD(A)Fs with PySpark 找到了解决方案 这个博客
- mapPartitions 分割数据;
- udaf 将 spark 数据帧转换为 pandas 数据帧;
- 在 udaf 和 return 一个 pandas 数据帧中做你的数据 etl 逻辑;
- udaf 会将 pandas 数据帧转换为 spark 数据帧;
- toDF() 合并结果 spark 数据帧并像 SaveAsTable 一样做一些持久化;
df = df.repartition('guestid').rdd.mapPartitions(udf_calc).toDF()