如何使用 Python class 处理 RDD?

How to process RDDs using a Python class?

我在 Spark 中将模型实现为 python class,但每次我尝试将 class 方法映射到 RDD 时都会失败。我的实际代码更复杂,但这个简化版本抓住了问题的核心:

class model(object):
    def __init__(self):
        self.data = sc.textFile('path/to/data.csv')
        # other misc setup
    def run_model(self):
        self.data = self.data.map(self.transformation_function)
    def transformation_function(self,row):
        row = row.split(',')
        return row[0]+row[1]

现在,如果我 运行 模型是这样的(例如):

test = model()
test.run_model()
test.data.take(10)

我收到以下错误:

异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。

我玩过这个,它似乎在我尝试将 class 方法映射到 class 中的 RDD 时可靠地发生。我已经确认,如果我在 class 结构之外实现,映射函数工作正常,所以问题肯定与 class 有关。有办法解决这个问题吗?

这里的问题比使用 嵌套 RDD 或在转换 中执行 Spark 操作要微妙一些。 Spark 不允许访问 SparkContext 内部操作或转换。

即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着您的 transformation 方法(引用 self)也保留 SparkContext,因此出现错误。

处理此问题的一种方法是使用静态方法:

class model(object):
    @staticmethod
    def transformation_function(row):
        row = row.split(',')
        return row[0]+row[1]

    def __init__(self):
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function)

编辑:

如果你想访问实例变量,你可以尝试这样的事情:

class model(object):
    @staticmethod
    def transformation_function(a_model):
        delim = a_model.delim
        def _transformation_function(row):
            return row.split(delim)
        return _transformation_function

    def __init__(self):
        self.delim = ','
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function(self))