为什么 pyspark 代码没有并行化到所有执行程序?

Why is the pyspark code not parallelizing to all the executors?

我已经在 dataproc 上创建了一个 7 节点集群(1 个主节点和 6 个执行器。3 个主执行器和 3 个次级可抢占执行器)。我可以在控制台中看到集群已创建并已更正。我有所有 6 个 ips 和 VM 名称。我正在尝试测试集群,但似乎所有执行程序上的代码都不是 运行,但最多只有 2 个。以下是我用来检查代码执行的执行者数量的代码:

import numpy as np
import socket
set(sc.parallelize(range(1,1000000)).map(lambda x : socket.gethostname()).collect())

输出:

{'monsoon-testing-sw-543d', 'monsoon-testing-sw-p7w7'}

我已经多次重启内核,但是,尽管执行者改变了执行代码的执行者数量保持不变。

有人可以帮助我了解这里发生了什么以及为什么 pyspark 没有将我的代码并行化到所有执行程序吗?

您有很多执行器要处理,但没有足够的数据分区来处理。可以在parallelize()方法中加入参数numSlices来定义应该创建多少分区:

rdd = sc.parallelize(range(1,1000000), numSlices=12)

分区数至少应等于或大于最佳工作分配的执行器数。

顺便说一句:使用 rdd.getNumPartitions() 你可以获得 RDD 中的分区数。