在 DataFrame 联合之后管理 Spark 分区
Managing Spark partitions after DataFrame unions
我有一个需要大量使用 unions
的 Spark 应用程序,因此我将在不同时间、不同情况下将大量数据帧联合在一起。我正在尝试尽可能高效地完成此 运行。我仍然是 Spark 的新手,我突然想到:
如果我有具有 X 个分区数 (numAPartitions
) 的 DataFrame 'A' (dfA
),我将其合并到 DataFrame 'B' (dfB
) 有 Y 个分区 (numBPartitions
),那么生成的联合 DataFrame (unionedDF
) 会是什么样子,结果是分区?
// How many partitions will unionedDF have?
// X * Y ?
// Something else?
val unionedDF : DataFrame = dfA.unionAll(dfB)
对我来说,理解这一点似乎非常重要,因为 Spark 性能 似乎 在很大程度上依赖于 DataFrames 采用的分区策略。因此,如果我左右联合 DataFrames,我需要确保我不断管理生成的联合 DataFrames 的分区。
我能想到的唯一事情(以便正确管理联合数据帧的分区)是重新分区它们,然后将数据帧持久化到memory/disk一旦我将它们合并:
val unionedDF : DataFrame = dfA.unionAll(dfB)
unionedDF.repartition(optimalNumberOfPartitions).persist(StorageLevel.MEMORY_AND_DISK)
这样,一旦它们合并,我们就将它们重新分区,以便将它们正确地分布在可用的 workers/executors 上,然后 persist(...)
调用告诉 Spark 不要驱逐 DataFrame从记忆中,所以我们可以继续工作。
问题是,重新分区听起来很昂贵,但可能不如替代方案(根本不管理分区)那么昂贵.是否有关于如何在 Spark-land 中有效管理工会的普遍接受的指南?
是的,分区对 spark 很重要。
我想知道您是否可以通过致电来自己找到答案:
yourResultedRDD.getNumPartitions()
Do I have to persist, post union?
一般来说,如果你要多次使用它,你必须 persist/cache 一个 RDD(不管它是联合的结果,还是土豆 :) )。这样做可以防止 spark 在内存中再次获取它,并且在某些情况下可以将应用程序的性能提高 15%!
例如,如果您打算只使用结果 RDD 一次,那么不持久化它是安全的。
Do I have to repartition?
既然你不关心找到分区的数量,你可以阅读我的memoryOverhead issue in Spark
关于分区数量如何影响您的应用程序。
一般来说,分区越多,每个执行器处理的数据块就越小。
回想一下,一个 worker 可以承载多个 executor,你可以把它想象成 worker 是你集群的 machine/node,而 executor 是一个进程(在核心中执行)运行在那个上面工人。
Isn't the Dataframe always in memory?
不是真的。这对于 spark 来说真的很可爱,因为当您处理 大数据 时,您不希望内存中存在不必要的东西,因为这会威胁到您应用程序的安全。
DataFrame 可以存储在 spark 为您创建的临时文件中,并仅在需要时加载到您的应用程序内存中。
Union只是将dataframe 1和dataframe 2中的分区数相加。两个dataframe具有相同的列数和相同的顺序来执行union操作。所以不用担心,如果两个数据帧中的分区列不同,将会有最多 m + n 个分区。
你不需要在加入后重新分区你的数据框,我的建议是使用合并代替重新分区,合并公共分区或合并一些小分区和avoid/reduce在分区内洗牌数据。
如果你 cache/persist dataframe 在每个 union 之后,你会降低性能并且沿袭不会被 cache/persist 打破,在这种情况下,垃圾收集将清理 cache/memory 在某些情况下大量内存密集型操作和重新计算将增加相同的计算时间,可能这次需要对 clear/removed 数据进行部分计算。
由于 spark 转换是惰性的,即; unionAll 是惰性操作并且 coalesce/repartition 也是惰性操作并在第一次操作时起作用,因此尝试在 8 的计数器等间隔后合并 unionall 结果并减少结果数据帧中的分区。如果您的解决方案中有大量内存密集型操作,请使用检查点来打破沿袭和存储数据。
我有一个需要大量使用 unions
的 Spark 应用程序,因此我将在不同时间、不同情况下将大量数据帧联合在一起。我正在尝试尽可能高效地完成此 运行。我仍然是 Spark 的新手,我突然想到:
如果我有具有 X 个分区数 (numAPartitions
) 的 DataFrame 'A' (dfA
),我将其合并到 DataFrame 'B' (dfB
) 有 Y 个分区 (numBPartitions
),那么生成的联合 DataFrame (unionedDF
) 会是什么样子,结果是分区?
// How many partitions will unionedDF have?
// X * Y ?
// Something else?
val unionedDF : DataFrame = dfA.unionAll(dfB)
对我来说,理解这一点似乎非常重要,因为 Spark 性能 似乎 在很大程度上依赖于 DataFrames 采用的分区策略。因此,如果我左右联合 DataFrames,我需要确保我不断管理生成的联合 DataFrames 的分区。
我能想到的唯一事情(以便正确管理联合数据帧的分区)是重新分区它们,然后将数据帧持久化到memory/disk一旦我将它们合并:
val unionedDF : DataFrame = dfA.unionAll(dfB)
unionedDF.repartition(optimalNumberOfPartitions).persist(StorageLevel.MEMORY_AND_DISK)
这样,一旦它们合并,我们就将它们重新分区,以便将它们正确地分布在可用的 workers/executors 上,然后 persist(...)
调用告诉 Spark 不要驱逐 DataFrame从记忆中,所以我们可以继续工作。
问题是,重新分区听起来很昂贵,但可能不如替代方案(根本不管理分区)那么昂贵.是否有关于如何在 Spark-land 中有效管理工会的普遍接受的指南?
是的,分区对 spark 很重要。
我想知道您是否可以通过致电来自己找到答案:
yourResultedRDD.getNumPartitions()
Do I have to persist, post union?
一般来说,如果你要多次使用它,你必须 persist/cache 一个 RDD(不管它是联合的结果,还是土豆 :) )。这样做可以防止 spark 在内存中再次获取它,并且在某些情况下可以将应用程序的性能提高 15%!
例如,如果您打算只使用结果 RDD 一次,那么不持久化它是安全的。
Do I have to repartition?
既然你不关心找到分区的数量,你可以阅读我的memoryOverhead issue in Spark 关于分区数量如何影响您的应用程序。
一般来说,分区越多,每个执行器处理的数据块就越小。
回想一下,一个 worker 可以承载多个 executor,你可以把它想象成 worker 是你集群的 machine/node,而 executor 是一个进程(在核心中执行)运行在那个上面工人。
Isn't the Dataframe always in memory?
不是真的。这对于 spark 来说真的很可爱,因为当您处理 大数据 时,您不希望内存中存在不必要的东西,因为这会威胁到您应用程序的安全。
DataFrame 可以存储在 spark 为您创建的临时文件中,并仅在需要时加载到您的应用程序内存中。
Union只是将dataframe 1和dataframe 2中的分区数相加。两个dataframe具有相同的列数和相同的顺序来执行union操作。所以不用担心,如果两个数据帧中的分区列不同,将会有最多 m + n 个分区。
你不需要在加入后重新分区你的数据框,我的建议是使用合并代替重新分区,合并公共分区或合并一些小分区和avoid/reduce在分区内洗牌数据。
如果你 cache/persist dataframe 在每个 union 之后,你会降低性能并且沿袭不会被 cache/persist 打破,在这种情况下,垃圾收集将清理 cache/memory 在某些情况下大量内存密集型操作和重新计算将增加相同的计算时间,可能这次需要对 clear/removed 数据进行部分计算。
由于 spark 转换是惰性的,即; unionAll 是惰性操作并且 coalesce/repartition 也是惰性操作并在第一次操作时起作用,因此尝试在 8 的计数器等间隔后合并 unionall 结果并减少结果数据帧中的分区。如果您的解决方案中有大量内存密集型操作,请使用检查点来打破沿袭和存储数据。