Spark 不允许我计算我加入的数据帧
Spark doesnt let me count my joined dataframes
新加入 Spark Jobs,我遇到了以下问题。
当我 运行 对任何新加入的数据帧进行计数时,作业 运行 会持续一段时间并将内存溢出到磁盘。这里有逻辑错误吗?
// pass spark configuration
val conf = new SparkConf()
.setMaster(threadMaster)
.setAppName(appName)
// Create a new spark context
val sc = new SparkContext(conf)
// Specify a SQL context and pass in the spark context we created
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
val dfSentRaw = sqlContext.read.parquet(inputPathSent)
val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
val dfFailedRaw = sqlContext.read.parquet(inputPathFailed)
// Rename the columns to avoid ambiguity when accessing the fields later
val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
.withColumnRenamed("campaign_id", "sent__campaign_id")
.withColumnRenamed("ced_email", "sent__ced_email")
.withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
.withColumnRenamed("riid", "sent__riid")
val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
.withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")
// LEFT Join with CLICKED on two fields, customer_id and campaign_id
val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
&& dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
dfSentClicked.count() //THIS WILL NOT WORK
val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
&& dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")
为什么不能再统计这些 two/three 数据帧了?我是否通过重命名搞乱了一些索引?
谢谢!
count
调用是您的 Spark 作业的唯一实际具体化,因此 count
并不是真正的问题,而是为 join
进行的洗牌就在它之前。您没有足够的内存来进行连接而不会溢出到磁盘。在 shuffle 中溢出到磁盘是让您的 Spark 作业永远耗时的一种非常简单的方法 =).
真正有助于防止随机播放溢出的一件事是拥有更多分区。然后在任何给定时间通过随机播放的数据都会减少。您可以设置 spark.sql.shuffle.partitions
来控制 Spark Sql 在联接或聚合中使用的分区数。它默认为 200,因此您可以尝试更高的设置。 http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
您可以增加本地 Spark 分配的堆大小 and/or 通过增加 spark.shuffle.memoryFraction
(默认为 0.4)和减少 spark.storage.memoryFraction
(默认到 0.6)。例如,当您进行 .cache
调用时会使用存储部分,而您可能并不关心它。
如果您非常倾向于完全避免溢出,可以通过将 spark.shuffle.spill
设置为 false
来关闭溢出。我相信如果您 运行 内存不足并且需要溢出而不是永远默默地占用,这将引发异常,并且可以帮助您更快地配置内存分配。
新加入 Spark Jobs,我遇到了以下问题。
当我 运行 对任何新加入的数据帧进行计数时,作业 运行 会持续一段时间并将内存溢出到磁盘。这里有逻辑错误吗?
// pass spark configuration
val conf = new SparkConf()
.setMaster(threadMaster)
.setAppName(appName)
// Create a new spark context
val sc = new SparkContext(conf)
// Specify a SQL context and pass in the spark context we created
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
val dfSentRaw = sqlContext.read.parquet(inputPathSent)
val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
val dfFailedRaw = sqlContext.read.parquet(inputPathFailed)
// Rename the columns to avoid ambiguity when accessing the fields later
val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
.withColumnRenamed("campaign_id", "sent__campaign_id")
.withColumnRenamed("ced_email", "sent__ced_email")
.withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
.withColumnRenamed("riid", "sent__riid")
val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
.withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")
// LEFT Join with CLICKED on two fields, customer_id and campaign_id
val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
&& dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
dfSentClicked.count() //THIS WILL NOT WORK
val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
&& dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")
为什么不能再统计这些 two/three 数据帧了?我是否通过重命名搞乱了一些索引?
谢谢!
count
调用是您的 Spark 作业的唯一实际具体化,因此 count
并不是真正的问题,而是为 join
进行的洗牌就在它之前。您没有足够的内存来进行连接而不会溢出到磁盘。在 shuffle 中溢出到磁盘是让您的 Spark 作业永远耗时的一种非常简单的方法 =).
真正有助于防止随机播放溢出的一件事是拥有更多分区。然后在任何给定时间通过随机播放的数据都会减少。您可以设置 spark.sql.shuffle.partitions
来控制 Spark Sql 在联接或聚合中使用的分区数。它默认为 200,因此您可以尝试更高的设置。 http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
您可以增加本地 Spark 分配的堆大小 and/or 通过增加 spark.shuffle.memoryFraction
(默认为 0.4)和减少 spark.storage.memoryFraction
(默认到 0.6)。例如,当您进行 .cache
调用时会使用存储部分,而您可能并不关心它。
如果您非常倾向于完全避免溢出,可以通过将 spark.shuffle.spill
设置为 false
来关闭溢出。我相信如果您 运行 内存不足并且需要溢出而不是永远默默地占用,这将引发异常,并且可以帮助您更快地配置内存分配。