当集群中 运行 时,Spark Scala FoldLeft 导致 StackOverflow
Spark Scala FoldLeft resulting in StackOverflow when run in the cluster
我正在使用以下代码,以便使用其行来重塑数据框以进行此重塑。
数据框包含产品更改其 ID 的日期,但为了将其与包含交易的其他巨大数据框连接起来,我需要一个定义有效 ID 范围的新列。
例如,如果产品 A 更改为产品 B,生效日期为 01/01,然后更改为产品 C,生效日期为 03/01,我需要将开始日期和结束日期放在同一行中,以便我可以将它与按产品有效 B(或 C)日期过滤的巨大交易数据框结合起来,这样我就可以将产品正确地重命名为它们的有效真实 ID。
另外一条资料,df_MPC大约800行,不会再增长了。
所以我正在尝试的方法(在开发环境中 运行 时有效)是 foldleft。
MPC 数据框的汇总版本为:
Product | Date | NewProd
A | 01/01/2018| B
B | 03/01/2018| C
Objective:
Product | Date | NewProd | OriginalProd | EndDate
A | 01/01/2018| B | A | 03/01
B | 03/01/2018| C | A | 31/12-9999
(OriginalProd 列是与交易数据框的最终连接所必需的)
导致Whosebug的代码如下:
var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}
val df_MPC_TRANSFORMED = rowList.reverse
.foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
.withColumn("EndDate",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
elem.getAs("D_EFFECTIVE_CHANGE"))
.otherwise(col("EndDate")))
.withColumn("OriginalProd",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
elem.getAs("N_PRODUCT_ID"))
.otherwise(col("OriginalProd")))
)
此代码将源数据帧(上面提供的示例)转换为 objective 数据帧(上面的示例)。
它通过以排序的方式(按日期顺序)遍历其所有 800 行来做到这一点,并且对于它的每一行:
- 更改与给定行匹配的所有产品的有效日期
- 在我们找到中间产品的情况下更新原始产品 ID
产品。例如,如果我们有一个产品从 ID "A" 换成
"B" 以及从 "B" 到 "C" 稍后的时间,我们将需要带有
能够加入我们的原始产品 ID(在本例中为 "A")
结果与原始交易 table,其中仅包含
产品 ID "A".
以及在集群中使用这段代码时抛出的错误:
Exception in thread "main" java.lang.WhosebugError
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.AbstractSet.apply(Set.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:334)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
如何使此代码在集群中以其在本地正常工作的方式工作?
谢谢!
我会检查本地机器和集群上 Spark 执行器配置的差异。可能是在本地机器上创建的线程数 (tasks/cores) 可能少于在集群中的执行程序中创建的任务数。减少每个执行器的核心数量将减少在执行器 jvm 中创建的线程数,因此线程堆栈占用的space。或者,您可以尝试增加每个执行程序的内存。最好在两台机器上保持执行程序的配置相同,然后查看问题是否重现。
我花了一段时间才弄明白你想做什么。我想你可以用更简单的方法来做同样的事情。
这并不能解释为什么您的代码不起作用,但是您的 foldleft 可以替换为 spark sql 查询,如下所示:
df_MPC_SOURCE.registerTempTable("mpc_source")
val test = sqlContext.sql(
"""select c1.N_PRODUCT_ID,c1.D_EFFECTIVE_CHANGE,c1.N_PRODUCT_ID_NEW,
|coalesce(c2.D_EFFECTIVE_CHANGE,c1.MPC_endDate) as MPC_endDate,
|coalesce(c3.N_PRODUCT_ID,c1.MPC_original) as MPC_original
|from mpc_source c1
|left join mpc_source c2 on c1.N_DISTRIBUTOR_CODE=c2.N_DISTRIBUTOR_CODE
|and c1.N_CONTRACT_CODE=c2.N_CONTRACT_CODE
|and c1.N_PRODUCT_ID_NEW=c2.N_PRODUCT_ID
|left join mpc_source c3 on c1.N_DISTRIBUTOR_CODE=c3.N_DISTRIBUTOR_CODE
|and c1.N_CONTRACT_CODE=c3.N_CONTRACT_CODE
|and c1.MPC_original = c3.N_PRODUCT_ID_NEW
""".stripMargin)
希望对您有所帮助。
我正在使用以下代码,以便使用其行来重塑数据框以进行此重塑。
数据框包含产品更改其 ID 的日期,但为了将其与包含交易的其他巨大数据框连接起来,我需要一个定义有效 ID 范围的新列。
例如,如果产品 A 更改为产品 B,生效日期为 01/01,然后更改为产品 C,生效日期为 03/01,我需要将开始日期和结束日期放在同一行中,以便我可以将它与按产品有效 B(或 C)日期过滤的巨大交易数据框结合起来,这样我就可以将产品正确地重命名为它们的有效真实 ID。
另外一条资料,df_MPC大约800行,不会再增长了。
所以我正在尝试的方法(在开发环境中 运行 时有效)是 foldleft。
MPC 数据框的汇总版本为:
Product | Date | NewProd
A | 01/01/2018| B
B | 03/01/2018| C
Objective:
Product | Date | NewProd | OriginalProd | EndDate
A | 01/01/2018| B | A | 03/01
B | 03/01/2018| C | A | 31/12-9999
(OriginalProd 列是与交易数据框的最终连接所必需的)
导致Whosebug的代码如下:
var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}
val df_MPC_TRANSFORMED = rowList.reverse
.foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
.withColumn("EndDate",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
elem.getAs("D_EFFECTIVE_CHANGE"))
.otherwise(col("EndDate")))
.withColumn("OriginalProd",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
elem.getAs("N_PRODUCT_ID"))
.otherwise(col("OriginalProd")))
)
此代码将源数据帧(上面提供的示例)转换为 objective 数据帧(上面的示例)。
它通过以排序的方式(按日期顺序)遍历其所有 800 行来做到这一点,并且对于它的每一行:
- 更改与给定行匹配的所有产品的有效日期
- 在我们找到中间产品的情况下更新原始产品 ID 产品。例如,如果我们有一个产品从 ID "A" 换成 "B" 以及从 "B" 到 "C" 稍后的时间,我们将需要带有 能够加入我们的原始产品 ID(在本例中为 "A") 结果与原始交易 table,其中仅包含 产品 ID "A".
以及在集群中使用这段代码时抛出的错误:
Exception in thread "main" java.lang.WhosebugError
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.AbstractSet.apply(Set.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:334)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
如何使此代码在集群中以其在本地正常工作的方式工作? 谢谢!
我会检查本地机器和集群上 Spark 执行器配置的差异。可能是在本地机器上创建的线程数 (tasks/cores) 可能少于在集群中的执行程序中创建的任务数。减少每个执行器的核心数量将减少在执行器 jvm 中创建的线程数,因此线程堆栈占用的space。或者,您可以尝试增加每个执行程序的内存。最好在两台机器上保持执行程序的配置相同,然后查看问题是否重现。
我花了一段时间才弄明白你想做什么。我想你可以用更简单的方法来做同样的事情。
这并不能解释为什么您的代码不起作用,但是您的 foldleft 可以替换为 spark sql 查询,如下所示:
df_MPC_SOURCE.registerTempTable("mpc_source")
val test = sqlContext.sql(
"""select c1.N_PRODUCT_ID,c1.D_EFFECTIVE_CHANGE,c1.N_PRODUCT_ID_NEW,
|coalesce(c2.D_EFFECTIVE_CHANGE,c1.MPC_endDate) as MPC_endDate,
|coalesce(c3.N_PRODUCT_ID,c1.MPC_original) as MPC_original
|from mpc_source c1
|left join mpc_source c2 on c1.N_DISTRIBUTOR_CODE=c2.N_DISTRIBUTOR_CODE
|and c1.N_CONTRACT_CODE=c2.N_CONTRACT_CODE
|and c1.N_PRODUCT_ID_NEW=c2.N_PRODUCT_ID
|left join mpc_source c3 on c1.N_DISTRIBUTOR_CODE=c3.N_DISTRIBUTOR_CODE
|and c1.N_CONTRACT_CODE=c3.N_CONTRACT_CODE
|and c1.MPC_original = c3.N_PRODUCT_ID_NEW
""".stripMargin)
希望对您有所帮助。