为什么 pyspark 会选择一个未广播的变量?

Why is pyspark picking up a variable that was not broadcast?

我正在使用 pyspark 来分析数据集,我有点惊讶为什么即使我使用的是 not[=26= 的变量,以下代码也能正常工作] 广播。

有问题的变量是 video,它在函数 filter 中使用,在连接之后。

seed = random.randint(0,999)

# df is a dataframe
# video is just one randomly sampled element
video = df.sample(False,0.001,seed).head()

# just a python list
otherVideos = [ (22,0.32),(213,0.43) ]

# transform the python list into an rdd 
resultsRdd = sc.parallelize(similarVideos)

rdd = df.rdd.map(lambda row: (row.video_id,row.title))

# perform a join between resultsRdd and rdd
# note that video.title was NOT broadcast
(resultsRdd
   .join(rdd)
   .filter(lambda pair: pair[1][1] != video.title) # HERE!!!
   .takeOrdered(10, key= lambda pair: -pair[1][0]))

我在独立模式下使用 pyspark,将以下参数传递给 pyspark-submit:

--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]

此外,我是 运行 jupyter(新 ipython-notebooks)上的先前代码。

[重新发布评论作为答案。]

对于这个概念,我认为 link on understanding closures 是一本很好的读物。本质上,您不需要广播 RDD 范围之外的所有变量,因为闭包(在您的情况下 video)将被序列化并发送到每个执行程序和任务以在任务执行期间进行访问。当广播的数据集很大时,广播变量很有用,因为它将作为只读缓存存在,将位于执行程序上,而不是 serialized/sent/deserialized 与该执行程序上的每个任务 运行。