将火花数据拆分为分区并将这些分区并行写入磁盘
Splitting spark data into partitions and writing those partitions to disk in parallel
问题概述:假设我在 AWS 的 EMR 集群上使用 spark 处理了 300+ GB 的数据。此数据具有三个属性,用于在 Hive 中使用的文件系统上进行分区:日期、小时和(比方说)anotherAttr。我想以最小化写入文件数的方式将此数据写入 fs。
我现在正在做的是获取日期、小时、anotherAttr 的不同组合,以及组成组合的行数。我将它们收集到驱动程序上的一个列表中,并遍历列表,为每个组合构建一个新的 DataFrame,使用行数重新分区该 DataFrame 以估计文件大小,并使用 DataFrameWriter 将文件写入磁盘,.orc
完成它。
出于组织原因,我们没有使用 Parquet。
此方法效果相当好,解决了使用 Hive 而不是 Spark 的下游团队看不到大量文件导致的性能问题的问题。例如,如果我使用整个 300 GB DataFrame,对 1000 个分区(在 spark 中)和相关列进行重新分区,并将其转储到磁盘,所有转储都是并行进行的,并在大约 9 分钟内完成整个过程。但这会为较大的分区增加多达 1000 个文件,这会破坏 Hive 性能。或者它破坏了某种性能,老实说不能 100% 确定是什么。我刚刚被要求将文件数量保持在尽可能低的水平。使用我正在使用的方法,我可以将文件保持为我想要的任何大小(无论如何相对接近),但是没有并行性并且需要大约 45 分钟才能 运行,主要是等待文件写入。
在我看来,因为一些源行和一些目标行之间存在一对一的关系,而且因为我可以将数据组织成非重叠的 "folders"(Hive 的分区) ,我应该能够以这样的方式组织我的 code/DataFrames ,以便我可以要求 spark 并行写入所有目标文件。有人对如何攻击它有建议吗?
我测试过但不起作用的东西:
使用 Scala 并行集合启动写入。无论 spark 对 DataFrame 做了什么,它都没有很好地分离任务,一些机器遇到了大量的垃圾收集问题。
DataFrame.map - 我试图映射独特组合的 DataFrame,并从那里开始写入,但无法访问我实际需要的数据的 DataFrame在那个 map
- 执行器上的 DataFrame 引用为空。
DataFrame.mapPartitions - 一个非初学者,无法从 mapPartitions
中想出任何想法来做我想做的事
'partition' 一词在这里也不是特别有用,因为它既指的是 spark 按某些标准拆分数据的概念,也指数据将在磁盘上为 Hive 组织的方式。我想我在上面的用法中已经很清楚了。因此,如果我想出一个完美的解决方案来解决这个问题,那就是我可以基于三个属性创建一个具有 1000 个分区的 DataFrame 以进行快速查询,然后从中创建另一个 DataFrame 集合,每个 DataFrame 都有一个独特的组合这些属性,重新分区(在 spark 中,但对于 Hive),分区数适合于它包含的数据的大小。大多数 DataFrames 将有 1 个分区,少数将有多达 10 个。文件应该约为 3 GB,并且我们的 EMR 集群具有比每个执行程序更多的 RAM,因此我们不应该看到这些 "large"分区。
创建 DataFrame 列表并重新分区每个 DataFrame 后,我可以让 spark 将它们全部并行写入磁盘。
在 spark 中可以实现这样的功能吗?
有一件事我在概念上不清楚:假设我有
val x = spark.sql("select * from source")
和
val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")
和
val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")
y
在多大程度上与 z
是不同的 DataFrame?如果我重新分区 y
,洗牌对 z
和 x
有什么影响?
这条语句:
I collect them into a List on the driver, and iterate over the list,
building a new DataFrame for each combination, repartitioning that
DataFrame using the number of rows to guestimate file size, and
writing the files to disk with DataFrameWriter, .orc finishing it off.
在 Spark 方面完全偏离了光束。收集到驱动程序从来都不是一个好方法,卷和 OOM 问题以及您的方法中的延迟很高。
使用下面的方法来简化并获得 Spark 的并行性,从而为您的老板节省时间和金钱:
df.repartition(cols...)...write.partitionBy(cols...)...
通过 repartition
进行洗牌,partitionBy
不会洗牌。
就这么简单,利用了 Spark 的默认并行性。
我们(几乎)遇到了同样的问题,我们最终直接使用 RDD(而不是 DataFrames)并实现了我们自己的分区机制(通过扩展 org.apache.spark.Partitioner)
详细信息:我们正在读取来自 Kafka 的 JSON 消息。 JSON 应按 customerid/date/more 字段分组并使用 Parquet 格式在 Hadoop 中编写,而不会创建太多小文件。
步骤是(简化版):
a) 从Kafka中读取消息,并将其转化为RDD[(GroupBy, Message)]的结构。 GroupBy 是一个案例 class 包含所有用于分组的字段。
b) 使用 reduceByKeyLocally 转换并获取每个组的指标图(messages/messages size/etc)——例如 Map[GroupBy, GroupByMetrics]
c) 创建一个 GroupPartitioner,它使用之前收集的指标(以及一些输入参数,如所需的 Parquet 大小等)来计算应该为每个 GroupBy 对象创建多少个分区。基本上我们正在扩展 org.apache.spark.Partitioner 并覆盖 numPartitions 和 getPartition(key: Any)
d) 我们使用之前定义的分区程序从 a) 中对 RDD 进行分区:newPartitionedRdd = rdd.partitionBy(ourCustomGroupByPartitioner)
e)用两个参数调用spark.sparkContext.runJob:第一个是在d处分区的RDD),第二个是自定义函数(func: (TaskContext, Iterator[T]),它将写入从 Iterator[T] 获取到 Hadoop/Parquet
的消息
假设我们有 1 亿条消息,这样分组
第 1 组 - 200 万
第 2 组 - 8000 万
第 3 组 - 1800 万
我们决定每个分区必须使用 150 万条消息来获取大于 500MB 的 Parquet 文件。我们最终会为 Group1 分配 2 个分区,为 Group2 分配 54 个分区,为 Group3 分配 12 个分区。
问题概述:假设我在 AWS 的 EMR 集群上使用 spark 处理了 300+ GB 的数据。此数据具有三个属性,用于在 Hive 中使用的文件系统上进行分区:日期、小时和(比方说)anotherAttr。我想以最小化写入文件数的方式将此数据写入 fs。
我现在正在做的是获取日期、小时、anotherAttr 的不同组合,以及组成组合的行数。我将它们收集到驱动程序上的一个列表中,并遍历列表,为每个组合构建一个新的 DataFrame,使用行数重新分区该 DataFrame 以估计文件大小,并使用 DataFrameWriter 将文件写入磁盘,.orc
完成它。
出于组织原因,我们没有使用 Parquet。
此方法效果相当好,解决了使用 Hive 而不是 Spark 的下游团队看不到大量文件导致的性能问题的问题。例如,如果我使用整个 300 GB DataFrame,对 1000 个分区(在 spark 中)和相关列进行重新分区,并将其转储到磁盘,所有转储都是并行进行的,并在大约 9 分钟内完成整个过程。但这会为较大的分区增加多达 1000 个文件,这会破坏 Hive 性能。或者它破坏了某种性能,老实说不能 100% 确定是什么。我刚刚被要求将文件数量保持在尽可能低的水平。使用我正在使用的方法,我可以将文件保持为我想要的任何大小(无论如何相对接近),但是没有并行性并且需要大约 45 分钟才能 运行,主要是等待文件写入。
在我看来,因为一些源行和一些目标行之间存在一对一的关系,而且因为我可以将数据组织成非重叠的 "folders"(Hive 的分区) ,我应该能够以这样的方式组织我的 code/DataFrames ,以便我可以要求 spark 并行写入所有目标文件。有人对如何攻击它有建议吗?
我测试过但不起作用的东西:
使用 Scala 并行集合启动写入。无论 spark 对 DataFrame 做了什么,它都没有很好地分离任务,一些机器遇到了大量的垃圾收集问题。
DataFrame.map - 我试图映射独特组合的 DataFrame,并从那里开始写入,但无法访问我实际需要的数据的 DataFrame在那个
map
- 执行器上的 DataFrame 引用为空。DataFrame.mapPartitions - 一个非初学者,无法从 mapPartitions
中想出任何想法来做我想做的事
'partition' 一词在这里也不是特别有用,因为它既指的是 spark 按某些标准拆分数据的概念,也指数据将在磁盘上为 Hive 组织的方式。我想我在上面的用法中已经很清楚了。因此,如果我想出一个完美的解决方案来解决这个问题,那就是我可以基于三个属性创建一个具有 1000 个分区的 DataFrame 以进行快速查询,然后从中创建另一个 DataFrame 集合,每个 DataFrame 都有一个独特的组合这些属性,重新分区(在 spark 中,但对于 Hive),分区数适合于它包含的数据的大小。大多数 DataFrames 将有 1 个分区,少数将有多达 10 个。文件应该约为 3 GB,并且我们的 EMR 集群具有比每个执行程序更多的 RAM,因此我们不应该看到这些 "large"分区。
创建 DataFrame 列表并重新分区每个 DataFrame 后,我可以让 spark 将它们全部并行写入磁盘。
在 spark 中可以实现这样的功能吗?
有一件事我在概念上不清楚:假设我有
val x = spark.sql("select * from source")
和
val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")
和
val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")
y
在多大程度上与 z
是不同的 DataFrame?如果我重新分区 y
,洗牌对 z
和 x
有什么影响?
这条语句:
I collect them into a List on the driver, and iterate over the list, building a new DataFrame for each combination, repartitioning that DataFrame using the number of rows to guestimate file size, and writing the files to disk with DataFrameWriter, .orc finishing it off.
在 Spark 方面完全偏离了光束。收集到驱动程序从来都不是一个好方法,卷和 OOM 问题以及您的方法中的延迟很高。
使用下面的方法来简化并获得 Spark 的并行性,从而为您的老板节省时间和金钱:
df.repartition(cols...)...write.partitionBy(cols...)...
通过 repartition
进行洗牌,partitionBy
不会洗牌。
就这么简单,利用了 Spark 的默认并行性。
我们(几乎)遇到了同样的问题,我们最终直接使用 RDD(而不是 DataFrames)并实现了我们自己的分区机制(通过扩展 org.apache.spark.Partitioner)
详细信息:我们正在读取来自 Kafka 的 JSON 消息。 JSON 应按 customerid/date/more 字段分组并使用 Parquet 格式在 Hadoop 中编写,而不会创建太多小文件。
步骤是(简化版): a) 从Kafka中读取消息,并将其转化为RDD[(GroupBy, Message)]的结构。 GroupBy 是一个案例 class 包含所有用于分组的字段。
b) 使用 reduceByKeyLocally 转换并获取每个组的指标图(messages/messages size/etc)——例如 Map[GroupBy, GroupByMetrics]
c) 创建一个 GroupPartitioner,它使用之前收集的指标(以及一些输入参数,如所需的 Parquet 大小等)来计算应该为每个 GroupBy 对象创建多少个分区。基本上我们正在扩展 org.apache.spark.Partitioner 并覆盖 numPartitions 和 getPartition(key: Any)
d) 我们使用之前定义的分区程序从 a) 中对 RDD 进行分区:newPartitionedRdd = rdd.partitionBy(ourCustomGroupByPartitioner)
e)用两个参数调用spark.sparkContext.runJob:第一个是在d处分区的RDD),第二个是自定义函数(func: (TaskContext, Iterator[T]),它将写入从 Iterator[T] 获取到 Hadoop/Parquet
的消息假设我们有 1 亿条消息,这样分组
第 1 组 - 200 万
第 2 组 - 8000 万
第 3 组 - 1800 万 我们决定每个分区必须使用 150 万条消息来获取大于 500MB 的 Parquet 文件。我们最终会为 Group1 分配 2 个分区,为 Group2 分配 54 个分区,为 Group3 分配 12 个分区。