join 和 reduceByKey 中的 spark 执行器内存不足
spark executor out of memory in join and reduceByKey
在spark2.0中,我有两个dataframes,我需要先加入它们,然后做一个reduceByKey来聚合数据。我总是在执行者中遇到 OOM。提前致谢。
数据
d1(1G,5亿行,缓存,按col id2分区)
id1 id2
1 1
1 3
1 4
2 0
2 7
...
d2(160G,200万行,缓存,按col id2分区,col值包含5000个浮点数的列表)
id2 value
0 [0.1, 0.2, 0.0001, ...]
1 [0.001, 0.7, 0.0002, ...]
...
现在我需要加入两个 table 来获得 d3 我使用 spark.sql
select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2
然后我在 d3 上执行 reduceByKey 并汇总 table d1
中每个 id1 的值
d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
.mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
.toDF()
我估计d4的大小是340G。现在我在 r3.8xlarge 机器上使用 运行 作业
mem: 244G
cpu: 64
Disk: 640G
问题
我尝试了一些配置,但我总是在执行程序中遇到 OOM。所以,问题是
是否可以在当前类型的机器上 运行 此作业?或者我应该使用更大的机器(有多大?)。但我记得我遇到过 articles/blogs 说用相对较小的机器进行 TB 级处理。
我应该做什么样的改进?例如spark配置,代码优化?
是否可以估算每个执行器所需的内存量?
Spark 配置
我试过的一些 Spark 配置
配置 1:
--verbose
--conf spark.sql.shuffle.partitions=200
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc - XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3
配置 2:
--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3
配置 3:
--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=true
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--executor-memory 6G
--executor-cores 2
--driver-memory 6G
--driver-cores 3
配置 4:
--verbose
--conf spark.sql.shuffle.partitions=20000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 13
--executor-memory 15G
--executor-cores 5
--driver-memory 13G
--driver-cores 5
错误
来自执行器的 OOM Error1
ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Heap
PSYoungGen total 1830400K, used 1401721K [0x0000000740000000, 0x00000007be900000, 0x00000007c0000000)
eden space 1588736K, 84% used [0x0000000740000000,0x0000000791e86980,0x00000007a0f80000)
from space 241664K, 24% used [0x00000007af600000,0x00000007b3057de8,0x00000007be200000)
to space 236032K, 0% used [0x00000007a0f80000,0x00000007a0f80000,0x00000007af600000)
ParOldGen total 4194304K, used 4075884K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 97% used [0x0000000640000000,0x0000000738c5b198,0x0000000740000000)
Metaspace used 59721K, capacity 60782K, committed 61056K, reserved 1101824K
class space used 7421K, capacity 7742K, committed 7808K, reserved 1048576K
来自执行器的 OOM Error2
ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container marked as failed: container_1477662810360_0002_01_000008 on host: ip-172-18-9-130.ec2.internal. Exit status: 52. Diagnostics: Exception from container-launch.
Heap
PSYoungGen total 1968128K, used 1900544K [0x0000000740000000, 0x00000007c0000000, 0x00000007c0000000)
eden space 1900544K, 100% used [0x0000000740000000,0x00000007b4000000,0x00000007b4000000)
from space 67584K, 0% used [0x00000007b4000000,0x00000007b4000000,0x00000007b8200000)
to space 103936K, 0% used [0x00000007b9a80000,0x00000007b9a80000,0x00000007c0000000)
ParOldGen total 4194304K, used 4194183K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 99% used [0x0000000640000000,0x000000073ffe1f38,0x0000000740000000)
Metaspace used 59001K, capacity 59492K, committed 61056K, reserved 1101824K
class space used 7300K, capacity 7491K, committed 7808K, reserved 1048576K
容器错误
16/10/28 14:33:21 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$$anon.hasNext(WholeStageCodegenExec.scala:386)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:36 ERROR Utils: Uncaught exception in thread driver-heartbeater
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Double.valueOf(Double.java:519)
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.get(UnsafeArrayData.java:138)
at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:57)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2517)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2517)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:43 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[stdout writer for python,5,main]
更新 1
如果我按 id2 分区,数据 d1 看起来很倾斜。结果,join会导致OOM。如果 d1 像我之前想的那样均匀分布,上面的配置应该可以工作。
更新 2
我发布了解决问题的尝试,以防有人遇到类似问题。
尝试 1
我的问题是,如果我按 id2 对 d1 进行分区,那么数据就会非常倾斜。因此,存在一些包含几乎所有 id1 的分区。因此,与d2 的JOIN 会导致OOM 错误。为了缓解这样的问题,我首先从 id2 中识别出一个子集 s
,如果按 id2 进行分区,它可能会导致这种倾斜的数据。然后我从 d2 创建一个 d5,仅包括 s
,从 d2 创建一个 d6,不包括 s
。幸运的是,d5 的大小不是太大。所以,我可以广播 join d1 和 d5。然后我加入 d1 和 d6。然后,我合并这两个结果并执行 reduceByKey。我非常接近解决问题。我没有继续这种方式,因为我的 d1 以后可能会变得更大。换句话说,这种方法对我来说并不是真正可扩展的
尝试 2
幸运的是,在我的例子中,d2 中的大多数值都非常小。根据我的应用程序,我可以安全地删除小值并将向量转换为 sparseVector 以显着减小 d2 的大小。这样做之后,我按 id1 对 d1 进行分区并广播连接 d2(在删除小值之后)。当然,必须增加驱动程序内存才能允许较大的广播变量。这对我有用,也适用于我的应用程序。
这里有一些尝试:稍微减少执行程序的大小。您目前拥有:
--executor-memory 48G
--executor-cores 15
试一试:
--executor-memory 16G
--executor-cores 5
出于各种原因,较小的执行程序大小似乎是最佳选择。其中之一是 java 堆大小大于 32G 会导致对象引用从 4 字节变为 8 字节,并且所有内存需求都会爆炸。
编辑:问题实际上可能是 d4 分区太大(尽管其他建议仍然适用!)。您可以通过将 d3 重新分区为更多分区(大约 d1 * 4)或将其传递给 reduceByKey
的 numPartitions
可选参数来解决此问题。这两个选项都会触发随机播放,但这总比崩溃好。
我遇到了同样的问题,但是我搜索了很多答案都不能解决我的问题。
最终,我一步步调试我的代码。我发现每个分区的数据大小不平衡引起的问题。只是 df_rdd.repartition(nums)
在spark2.0中,我有两个dataframes,我需要先加入它们,然后做一个reduceByKey来聚合数据。我总是在执行者中遇到 OOM。提前致谢。
数据
d1(1G,5亿行,缓存,按col id2分区)
id1 id2
1 1
1 3
1 4
2 0
2 7
...
d2(160G,200万行,缓存,按col id2分区,col值包含5000个浮点数的列表)
id2 value
0 [0.1, 0.2, 0.0001, ...]
1 [0.001, 0.7, 0.0002, ...]
...
现在我需要加入两个 table 来获得 d3 我使用 spark.sql
select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2
然后我在 d3 上执行 reduceByKey 并汇总 table d1
中每个 id1 的值d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
.mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
.toDF()
我估计d4的大小是340G。现在我在 r3.8xlarge 机器上使用 运行 作业
mem: 244G
cpu: 64
Disk: 640G
问题
我尝试了一些配置,但我总是在执行程序中遇到 OOM。所以,问题是
是否可以在当前类型的机器上 运行 此作业?或者我应该使用更大的机器(有多大?)。但我记得我遇到过 articles/blogs 说用相对较小的机器进行 TB 级处理。
我应该做什么样的改进?例如spark配置,代码优化?
是否可以估算每个执行器所需的内存量?
Spark 配置
我试过的一些 Spark 配置
配置 1:
--verbose
--conf spark.sql.shuffle.partitions=200
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc - XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3
配置 2:
--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3
配置 3:
--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=true
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--executor-memory 6G
--executor-cores 2
--driver-memory 6G
--driver-cores 3
配置 4:
--verbose
--conf spark.sql.shuffle.partitions=20000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 13
--executor-memory 15G
--executor-cores 5
--driver-memory 13G
--driver-cores 5
错误
来自执行器的 OOM Error1
ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Heap
PSYoungGen total 1830400K, used 1401721K [0x0000000740000000, 0x00000007be900000, 0x00000007c0000000)
eden space 1588736K, 84% used [0x0000000740000000,0x0000000791e86980,0x00000007a0f80000)
from space 241664K, 24% used [0x00000007af600000,0x00000007b3057de8,0x00000007be200000)
to space 236032K, 0% used [0x00000007a0f80000,0x00000007a0f80000,0x00000007af600000)
ParOldGen total 4194304K, used 4075884K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 97% used [0x0000000640000000,0x0000000738c5b198,0x0000000740000000)
Metaspace used 59721K, capacity 60782K, committed 61056K, reserved 1101824K
class space used 7421K, capacity 7742K, committed 7808K, reserved 1048576K
来自执行器的 OOM Error2
ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container marked as failed: container_1477662810360_0002_01_000008 on host: ip-172-18-9-130.ec2.internal. Exit status: 52. Diagnostics: Exception from container-launch.
Heap
PSYoungGen total 1968128K, used 1900544K [0x0000000740000000, 0x00000007c0000000, 0x00000007c0000000)
eden space 1900544K, 100% used [0x0000000740000000,0x00000007b4000000,0x00000007b4000000)
from space 67584K, 0% used [0x00000007b4000000,0x00000007b4000000,0x00000007b8200000)
to space 103936K, 0% used [0x00000007b9a80000,0x00000007b9a80000,0x00000007c0000000)
ParOldGen total 4194304K, used 4194183K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 99% used [0x0000000640000000,0x000000073ffe1f38,0x0000000740000000)
Metaspace used 59001K, capacity 59492K, committed 61056K, reserved 1101824K
class space used 7300K, capacity 7491K, committed 7808K, reserved 1048576K
容器错误
16/10/28 14:33:21 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$$anon.hasNext(WholeStageCodegenExec.scala:386)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:36 ERROR Utils: Uncaught exception in thread driver-heartbeater
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Double.valueOf(Double.java:519)
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.get(UnsafeArrayData.java:138)
at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:57)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2517)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2517)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:43 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[stdout writer for python,5,main]
更新 1
如果我按 id2 分区,数据 d1 看起来很倾斜。结果,join会导致OOM。如果 d1 像我之前想的那样均匀分布,上面的配置应该可以工作。
更新 2
我发布了解决问题的尝试,以防有人遇到类似问题。
尝试 1
我的问题是,如果我按 id2 对 d1 进行分区,那么数据就会非常倾斜。因此,存在一些包含几乎所有 id1 的分区。因此,与d2 的JOIN 会导致OOM 错误。为了缓解这样的问题,我首先从 id2 中识别出一个子集 s
,如果按 id2 进行分区,它可能会导致这种倾斜的数据。然后我从 d2 创建一个 d5,仅包括 s
,从 d2 创建一个 d6,不包括 s
。幸运的是,d5 的大小不是太大。所以,我可以广播 join d1 和 d5。然后我加入 d1 和 d6。然后,我合并这两个结果并执行 reduceByKey。我非常接近解决问题。我没有继续这种方式,因为我的 d1 以后可能会变得更大。换句话说,这种方法对我来说并不是真正可扩展的
尝试 2
幸运的是,在我的例子中,d2 中的大多数值都非常小。根据我的应用程序,我可以安全地删除小值并将向量转换为 sparseVector 以显着减小 d2 的大小。这样做之后,我按 id1 对 d1 进行分区并广播连接 d2(在删除小值之后)。当然,必须增加驱动程序内存才能允许较大的广播变量。这对我有用,也适用于我的应用程序。
这里有一些尝试:稍微减少执行程序的大小。您目前拥有:
--executor-memory 48G
--executor-cores 15
试一试:
--executor-memory 16G
--executor-cores 5
出于各种原因,较小的执行程序大小似乎是最佳选择。其中之一是 java 堆大小大于 32G 会导致对象引用从 4 字节变为 8 字节,并且所有内存需求都会爆炸。
编辑:问题实际上可能是 d4 分区太大(尽管其他建议仍然适用!)。您可以通过将 d3 重新分区为更多分区(大约 d1 * 4)或将其传递给 reduceByKey
的 numPartitions
可选参数来解决此问题。这两个选项都会触发随机播放,但这总比崩溃好。
我遇到了同样的问题,但是我搜索了很多答案都不能解决我的问题。
最终,我一步步调试我的代码。我发现每个分区的数据大小不平衡引起的问题。只是 df_rdd.repartition(nums)