如何使用 Python class 处理 RDD?
How to process RDDs using a Python class?
我在 Spark 中将模型实现为 python class,但每次我尝试将 class 方法映射到 RDD 时都会失败。我的实际代码更复杂,但这个简化版本抓住了问题的核心:
class model(object):
def __init__(self):
self.data = sc.textFile('path/to/data.csv')
# other misc setup
def run_model(self):
self.data = self.data.map(self.transformation_function)
def transformation_function(self,row):
row = row.split(',')
return row[0]+row[1]
现在,如果我 运行 模型是这样的(例如):
test = model()
test.run_model()
test.data.take(10)
我收到以下错误:
异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。
我玩过这个,它似乎在我尝试将 class 方法映射到 class 中的 RDD 时可靠地发生。我已经确认,如果我在 class 结构之外实现,映射函数工作正常,所以问题肯定与 class 有关。有办法解决这个问题吗?
这里的问题比使用 嵌套 RDD 或在转换 中执行 Spark 操作要微妙一些。 Spark 不允许访问 SparkContext
内部操作或转换。
即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着您的 transformation
方法(引用 self
)也保留 SparkContext
,因此出现错误。
处理此问题的一种方法是使用静态方法:
class model(object):
@staticmethod
def transformation_function(row):
row = row.split(',')
return row[0]+row[1]
def __init__(self):
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function)
编辑:
如果你想访问实例变量,你可以尝试这样的事情:
class model(object):
@staticmethod
def transformation_function(a_model):
delim = a_model.delim
def _transformation_function(row):
return row.split(delim)
return _transformation_function
def __init__(self):
self.delim = ','
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function(self))
我在 Spark 中将模型实现为 python class,但每次我尝试将 class 方法映射到 RDD 时都会失败。我的实际代码更复杂,但这个简化版本抓住了问题的核心:
class model(object):
def __init__(self):
self.data = sc.textFile('path/to/data.csv')
# other misc setup
def run_model(self):
self.data = self.data.map(self.transformation_function)
def transformation_function(self,row):
row = row.split(',')
return row[0]+row[1]
现在,如果我 运行 模型是这样的(例如):
test = model()
test.run_model()
test.data.take(10)
我收到以下错误:
异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。
我玩过这个,它似乎在我尝试将 class 方法映射到 class 中的 RDD 时可靠地发生。我已经确认,如果我在 class 结构之外实现,映射函数工作正常,所以问题肯定与 class 有关。有办法解决这个问题吗?
这里的问题比使用 嵌套 RDD 或在转换 中执行 Spark 操作要微妙一些。 Spark 不允许访问 SparkContext
内部操作或转换。
即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着您的 transformation
方法(引用 self
)也保留 SparkContext
,因此出现错误。
处理此问题的一种方法是使用静态方法:
class model(object):
@staticmethod
def transformation_function(row):
row = row.split(',')
return row[0]+row[1]
def __init__(self):
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function)
编辑:
如果你想访问实例变量,你可以尝试这样的事情:
class model(object):
@staticmethod
def transformation_function(a_model):
delim = a_model.delim
def _transformation_function(row):
return row.split(delim)
return _transformation_function
def __init__(self):
self.delim = ','
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function(self))