DataFrame 操作的奇怪行为
Weird behavior of DataFrame operations
考虑代码:
val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from ORCtable")
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)
这会产生警告WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
代码失败java.lang.OutOfMemoryError: GC overhead limit exceeded
。
但是如果我运行下面的代码:
val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from ORCtable limit 2000000")//only difference here
//ORC table has 1651343 rows so doesn't exceed limit 2000000
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)
这会产生正确的输出。我不知道为什么会发生这种情况以及发生了什么变化。有人可以帮助理解这一点吗?
回答我自己的问题:Spark physical execution plan
生成相同 dataframe
的两种方式不同,可以通过调用 .explain()
方法进行检查。
第一种方式使用 broadcast-hash join
导致 java.lang.OutOfMemoryError: GC overhead limit exceeded
而后一种方式运行 sort-merge join
通常较慢但不会对垃圾收集造成太大压力。
物理执行计划的这种差异是由 df2 dataframe
上的附加 filter
操作引起的。
考虑代码:
val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from ORCtable")
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)
这会产生警告WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
代码失败java.lang.OutOfMemoryError: GC overhead limit exceeded
。
但是如果我运行下面的代码:
val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from ORCtable limit 2000000")//only difference here
//ORC table has 1651343 rows so doesn't exceed limit 2000000
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)
这会产生正确的输出。我不知道为什么会发生这种情况以及发生了什么变化。有人可以帮助理解这一点吗?
回答我自己的问题:Spark physical execution plan
生成相同 dataframe
的两种方式不同,可以通过调用 .explain()
方法进行检查。
第一种方式使用 broadcast-hash join
导致 java.lang.OutOfMemoryError: GC overhead limit exceeded
而后一种方式运行 sort-merge join
通常较慢但不会对垃圾收集造成太大压力。
物理执行计划的这种差异是由 df2 dataframe
上的附加 filter
操作引起的。