Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes
我正在运行在以下配置上安装一个 spark 应用程序:
1个Master,2个Worker节点。
每个工人有 88 个核心,因此总数没有。核心数 176
每个工作人员有 502 GB 内存,因此可用总内存为 1004 GB
我在 运行 应用程序时遇到异常 :
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$$anonfun$apply.apply(BroadcastExchangeExec.scala:115)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$$anonfun$apply.apply(BroadcastExchangeExec.scala:73)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture.apply(BroadcastExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture.apply(BroadcastExchangeExec.scala:72)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这个错误本身提到了两个解决方案:
作为解决方法,您可以通过设置来禁用广播
spark.sql.autoBroadcastJoinThreshold 到 -1。
或
通过将 spark.driver.memory 设置为
更高的价值。
我正在尝试 运行 设置更多驱动程序内存,但我想了解此问题的根本原因。谁能解释一下。
我在代码中使用了Java。
编辑 1
我在我的代码中使用了广播变量。
编辑 2
添加包含广播变量的代码。
//1.
Dataset<Row> currencySet1 = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CURRENCY_CD).load();
currencySetCache = currencySet1.select(CURRENCY_CD, DECIMAL_POSITIONS).persist(StorageLevel.MEMORY_ONLY());
Dataset<Row> currencyCodes = currencySetCache.select(CURRENCY_CD);
currencySet = currencyCodes.as(Encoders.STRING()).collectAsList();
//2.
Dataset<Row> divisionSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CIS_DIVISION).load();
divisionSetCache = divisionSet.select(CIS_DIVISION).persist(StorageLevel.MEMORY_ONLY());
divisionList = divisionSetCache.as(Encoders.STRING()).collectAsList();
//3.
Dataset<Row> userIdSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", SC_USER).load();
userIdSetCache = userIdSet.select(USER_ID).persist(StorageLevel.MEMORY_ONLY());
userIdList = userIdSetCache.as(Encoders.STRING()).collectAsList();
ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);
ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);
ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);
//Validation -- Start
Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
broadcastVarForUserID.value()),encoder);
validateDataset.persist(StorageLevel.MEMORY_ONLY());
可能的根本原因: "spark.driver.memory" 的默认值仅 1 Gb(取决于分配),这是一个非常小的数字。如果您正在读取驱动程序上的大量数据,则很容易发生 OutOfMemory,异常建议是正确的。
解决方案:至少增加"spark.driver.memory"和"spark.executor.memory"到16Gb。
我正在运行在以下配置上安装一个 spark 应用程序:
1个Master,2个Worker节点。
每个工人有 88 个核心,因此总数没有。核心数 176
每个工作人员有 502 GB 内存,因此可用总内存为 1004 GB
我在 运行 应用程序时遇到异常 :
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$$anonfun$apply.apply(BroadcastExchangeExec.scala:115)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$$anonfun$apply.apply(BroadcastExchangeExec.scala:73)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture.apply(BroadcastExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture.apply(BroadcastExchangeExec.scala:72)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这个错误本身提到了两个解决方案:
作为解决方法,您可以通过设置来禁用广播 spark.sql.autoBroadcastJoinThreshold 到 -1。
或
通过将 spark.driver.memory 设置为 更高的价值。
我正在尝试 运行 设置更多驱动程序内存,但我想了解此问题的根本原因。谁能解释一下。
我在代码中使用了Java。
编辑 1
我在我的代码中使用了广播变量。
编辑 2
添加包含广播变量的代码。
//1.
Dataset<Row> currencySet1 = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CURRENCY_CD).load();
currencySetCache = currencySet1.select(CURRENCY_CD, DECIMAL_POSITIONS).persist(StorageLevel.MEMORY_ONLY());
Dataset<Row> currencyCodes = currencySetCache.select(CURRENCY_CD);
currencySet = currencyCodes.as(Encoders.STRING()).collectAsList();
//2.
Dataset<Row> divisionSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CIS_DIVISION).load();
divisionSetCache = divisionSet.select(CIS_DIVISION).persist(StorageLevel.MEMORY_ONLY());
divisionList = divisionSetCache.as(Encoders.STRING()).collectAsList();
//3.
Dataset<Row> userIdSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", SC_USER).load();
userIdSetCache = userIdSet.select(USER_ID).persist(StorageLevel.MEMORY_ONLY());
userIdList = userIdSetCache.as(Encoders.STRING()).collectAsList();
ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);
ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);
ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);
//Validation -- Start
Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
broadcastVarForUserID.value()),encoder);
validateDataset.persist(StorageLevel.MEMORY_ONLY());
可能的根本原因: "spark.driver.memory" 的默认值仅 1 Gb(取决于分配),这是一个非常小的数字。如果您正在读取驱动程序上的大量数据,则很容易发生 OutOfMemory,异常建议是正确的。
解决方案:至少增加"spark.driver.memory"和"spark.executor.memory"到16Gb。