访问 Spark RDD 时在闭包中使用局部变量
Usage of local variables in closures when accessing Spark RDDs
我对访问 Spark RDD 时闭包中局部变量的使用有疑问。我想解决的问题如下所示:
我有一个应该读入 RDD 的文本文件列表。
但是,首先我需要向从单个文本文件创建的 RDD 添加附加信息。此附加信息是从文件名中提取的。然后,使用union()将RDD放入一个大的RDD中。
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
问题是循环中的 map() 函数没有引用“正确的”file_owner。相反,它将引用 file_owner 的最新值。在我的本地机器上,我通过为每个 RDD 调用 cache() 函数设法解决了这个问题:
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..
我的问题:使用 cache() 是我问题的正确解决方案吗?还有其他选择吗?
非常感谢!
因此,您正在执行的 cache() 方法不一定会在 100% 的时间内工作,只要没有节点失败并且不需要重新计算分区,它就可以工作。一个简单的解决方案是创建一个函数,它将 "capture" 的值设为 file_owner。这是潜在解决方案的 pyspark shell 中的一个快速小插图:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0-SNAPSHOT
/_/
Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc.
>>> hi = "hi"
>>> sc.parallelize(["panda"])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:365
>>> r = sc.parallelize(["panda"])
>>> meeps = r.map(lambda x : x + hi)
>>> hi = "by"
>>> meeps.collect()
['pandaby']
>>> hi = "hi"
>>> def makeGreetFunction(param):
... return (lambda x: x + param)
...
>>> f = makeGreetFunction(hi)
>>> hi="by"
>>> meeps = r.map(f)
>>> meeps.collect()
['pandahi']
>>>
您可以制作一个文件所有者数组并在地图转换中使用它:
file_owner[i] = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner[i]))
这不是 Spark 现象,而是一个普通的 Python。
>>> fns = []
>>> for i in range(3):
... fns.append(lambda: i)
...
>>> for fn in fns:
... print fn()
...
2
2
2
避免它的一种方法是声明默认参数的函数。默认值在声明时计算。
>>> fns = []
>>> for i in range(3):
... def f(i=i):
... return i
... fns.append(f)
...
>>> for fn in fns:
... print fn()
...
0
1
2
这个问题经常出现,看看这些其他问题:
- Lexical closures in Python
- What do (lambda) function closures capture?
正如其他人所解释的那样,您的 lambda 函数的问题在于它将在执行时评估 file_owner
。要在 for 循环迭代期间强制对其求值,您必须创建 并执行 构造函数。以下是使用 lambda 表达式的方法:
# ...
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map((lambda owner: lambda line: (line,owner))(file_owner))
# ...
我对访问 Spark RDD 时闭包中局部变量的使用有疑问。我想解决的问题如下所示:
我有一个应该读入 RDD 的文本文件列表。 但是,首先我需要向从单个文本文件创建的 RDD 添加附加信息。此附加信息是从文件名中提取的。然后,使用union()将RDD放入一个大的RDD中。
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
问题是循环中的 map() 函数没有引用“正确的”file_owner。相反,它将引用 file_owner 的最新值。在我的本地机器上,我通过为每个 RDD 调用 cache() 函数设法解决了这个问题:
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..
我的问题:使用 cache() 是我问题的正确解决方案吗?还有其他选择吗?
非常感谢!
因此,您正在执行的 cache() 方法不一定会在 100% 的时间内工作,只要没有节点失败并且不需要重新计算分区,它就可以工作。一个简单的解决方案是创建一个函数,它将 "capture" 的值设为 file_owner。这是潜在解决方案的 pyspark shell 中的一个快速小插图:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0-SNAPSHOT
/_/
Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc.
>>> hi = "hi"
>>> sc.parallelize(["panda"])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:365
>>> r = sc.parallelize(["panda"])
>>> meeps = r.map(lambda x : x + hi)
>>> hi = "by"
>>> meeps.collect()
['pandaby']
>>> hi = "hi"
>>> def makeGreetFunction(param):
... return (lambda x: x + param)
...
>>> f = makeGreetFunction(hi)
>>> hi="by"
>>> meeps = r.map(f)
>>> meeps.collect()
['pandahi']
>>>
您可以制作一个文件所有者数组并在地图转换中使用它:
file_owner[i] = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner[i]))
这不是 Spark 现象,而是一个普通的 Python。
>>> fns = []
>>> for i in range(3):
... fns.append(lambda: i)
...
>>> for fn in fns:
... print fn()
...
2
2
2
避免它的一种方法是声明默认参数的函数。默认值在声明时计算。
>>> fns = []
>>> for i in range(3):
... def f(i=i):
... return i
... fns.append(f)
...
>>> for fn in fns:
... print fn()
...
0
1
2
这个问题经常出现,看看这些其他问题:
- Lexical closures in Python
- What do (lambda) function closures capture?
正如其他人所解释的那样,您的 lambda 函数的问题在于它将在执行时评估 file_owner
。要在 for 循环迭代期间强制对其求值,您必须创建 并执行 构造函数。以下是使用 lambda 表达式的方法:
# ...
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map((lambda owner: lambda line: (line,owner))(file_owner))
# ...