在 spark 的 python 中从 UDF(用户定义函数)访问全局变量

Access global variable from UDF (User Defined Function) in python in spark

我正在尝试从 python 中的 pyspark.sql.functions.udf 函数内部更改全局变量。但是,更改没有反映在全局变量中。

连同输出的可重现示例是:

counter = 0

schema2 = StructType([\
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)   
])

data2 = [(1, "A"), (2, "B")]

df = spark.createDataFrame(data = data2, schema = schema2)

def myFunc(column):
    global counter
    counter = counter + 1
    return column + 5
  
myFuncUDF = udf(myFunc, IntegerType())

display(df.withColumn('id1', myFuncUDF(df.id)))

输出:

id 名字 id1
1 一个 6
2 B 7

当我打印计数器变量时,它仍然是 0。

谁能帮我知道如何访问 UDF 中的全局变量并在每次调用 UDF 时更改全局变量? 或者这是否不可能?

我们可以创建一个自定义集合累加器来存储 ID。

class SetAccumulator(AccumulatorParam):
    def zero(self, init_value: set()):
        return init_value
    
    def addInPlace(self, v1: set, v2: set):
        return v1.union(v2)

初始化集合累加器,并在转换数据帧时从我们的 spark 作业 运行 所在的每个线程添加到累加器。参考-

#accumulator initialization
acc = spark.sparkContext.accumulator(set(), SetAccumulator())

schema2 = StructType([\
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)   
])

data2 = [(1, "A"), (2, "B")]

df = spark.createDataFrame(data = data2, schema = schema2)

#access accumulator as a global variable inside the udf 
def myFunc(column):
    global acc
    int_set = set()
    int_set.add(column)
    acc += int_set
    return column + 5
  
myFuncUDF = udf(myFunc, IntegerType())