NameError: name 'acc' is not defined in pyspark accumulator

NameError: name 'acc' is not defined in pyspark accumulator

在 pyspark 中测试 Accumulator 但出错了:

def test():
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf).getOrCreate()
    rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
                            Row(user="hadoop", item="book"), Row(user="python", item="duck")])

    acc = sc.accumulator(0)
    print("accumulator: {}".format(acc))

    def imap(row):
        global acc
        acc += 1
        return row

    rdds.map(imap).foreach(print)
    print(acc.value)

错误是:

...
return f(*args, **kwargs)
File "test_als1.py", line 205, in imap
acc += 1
NameError: name 'acc' is not defined

但是我设置了acc作为全局变量,我该如何编写代码?

只需删除此行

global acc

global 用于访问全局声明的变量,但您的变量是在函数内部声明的,您可以在嵌套的 imap 函数中直接访问它。

更多全局访问示例here

问题是 imap 引用了一个不存在的全局变量(test 中的赋值只在该函数中创建了一个局部变量)。这个简单的程序(没有 Spark)由于同样的原因失败并出现同样的错误:

def foo():
    blah = 1
    def bar():
        global blah
        print(blah)
    bar()


if __name__ == '__main__':
    foo()

在模块级别分配 acc 有效:

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf).getOrCreate()
    rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
                           Row(user="hadoop", item="book"), Row(user="python", item="duck")])

    acc = sc.accumulator(0)
    print("accumulator: {}".format(acc))

    def imap(row):
        global acc
        acc += 1
        return row

    rdds.map(imap).foreach(print)
    print(acc.value)

如果您需要保留函数 test.

,则向 test 添加 global acc 语句是一种替代方法