flatMap 在 pyspark 中的自定义对象列表
flatMap over list of custom objects in pyspark
当 运行 flatMap() 在 class 的对象列表上时出现错误。它适用于常规 python 数据类型,如 int、list 等。但是当列表包含我的 class 的对象时,我会遇到错误。这是完整的代码:
from pyspark import SparkContext
sc = SparkContext("local","WordCountBySparkKeyword")
def func(x):
if x==2:
return [2, 3, 4]
return [1]
rdd = sc.parallelize([2])
rdd = rdd.flatMap(func) # rdd.collect() now has [2, 3, 4]
rdd = rdd.flatMap(func) # rdd.collect() now has [2, 3, 4, 1, 1]
print rdd.collect() # gives expected output
# Class I'm defining
class node(object):
def __init__(self, value):
self.value = value
# Representation, for printing node
def __repr__(self):
return self.value
def foo(x):
if x.value==2:
return [node(2), node(3), node(4)]
return [node(1)]
rdd = sc.parallelize([node(2)])
rdd = rdd.flatMap(foo) #marker 2
print rdd.collect() # rdd.collect should contain nodes with values [2, 3, 4, 1, 1]
代码在标记 1 之前工作正常(在代码中注释)。问题出现在标记 2 之后。我收到的具体错误消息是 AttributeError: 'module' object has no attribute 'node'
我该如何解决这个错误?
我正在开发 ubuntu、运行 pyspark 1.4.1
您得到的错误与 flatMap
完全无关。如果您在主脚本中定义 node
class 它可以在驱动程序上访问,但不会分发给工作人员。要使其工作,您应该将 node
定义放在单独的模块中,并确保将其分发给工作人员。
- 使用
node
定义创建单独的模块,我们称之为 node.py
在您的主脚本中导入此 node
class:
from node import node
确保模块分发给工人:
sc.addPyFile("node.py")
现在一切都应该按预期工作了。
旁注:
- PEP 8 为 class 名称推荐 CapWords。这不是一个硬性要求,但它让生活更轻松
__repr__
方法应该returna string representation of an object。至少确保它是 string
,但正确的表示更好:
def __repr__(self):
return "node({0})".format(repr(self.value))
当 运行 flatMap() 在 class 的对象列表上时出现错误。它适用于常规 python 数据类型,如 int、list 等。但是当列表包含我的 class 的对象时,我会遇到错误。这是完整的代码:
from pyspark import SparkContext
sc = SparkContext("local","WordCountBySparkKeyword")
def func(x):
if x==2:
return [2, 3, 4]
return [1]
rdd = sc.parallelize([2])
rdd = rdd.flatMap(func) # rdd.collect() now has [2, 3, 4]
rdd = rdd.flatMap(func) # rdd.collect() now has [2, 3, 4, 1, 1]
print rdd.collect() # gives expected output
# Class I'm defining
class node(object):
def __init__(self, value):
self.value = value
# Representation, for printing node
def __repr__(self):
return self.value
def foo(x):
if x.value==2:
return [node(2), node(3), node(4)]
return [node(1)]
rdd = sc.parallelize([node(2)])
rdd = rdd.flatMap(foo) #marker 2
print rdd.collect() # rdd.collect should contain nodes with values [2, 3, 4, 1, 1]
代码在标记 1 之前工作正常(在代码中注释)。问题出现在标记 2 之后。我收到的具体错误消息是 AttributeError: 'module' object has no attribute 'node'
我该如何解决这个错误?
我正在开发 ubuntu、运行 pyspark 1.4.1
您得到的错误与 flatMap
完全无关。如果您在主脚本中定义 node
class 它可以在驱动程序上访问,但不会分发给工作人员。要使其工作,您应该将 node
定义放在单独的模块中,并确保将其分发给工作人员。
- 使用
node
定义创建单独的模块,我们称之为node.py
在您的主脚本中导入此
node
class:from node import node
确保模块分发给工人:
sc.addPyFile("node.py")
现在一切都应该按预期工作了。
旁注:
- PEP 8 为 class 名称推荐 CapWords。这不是一个硬性要求,但它让生活更轻松
__repr__
方法应该returna string representation of an object。至少确保它是string
,但正确的表示更好:def __repr__(self): return "node({0})".format(repr(self.value))