在 spark 的 python 中从 UDF(用户定义函数)访问全局变量
Access global variable from UDF (User Defined Function) in python in spark
我正在尝试从 python 中的 pyspark.sql.functions.udf
函数内部更改全局变量。但是,更改没有反映在全局变量中。
连同输出的可重现示例是:
counter = 0
schema2 = StructType([\
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
data2 = [(1, "A"), (2, "B")]
df = spark.createDataFrame(data = data2, schema = schema2)
def myFunc(column):
global counter
counter = counter + 1
return column + 5
myFuncUDF = udf(myFunc, IntegerType())
display(df.withColumn('id1', myFuncUDF(df.id)))
输出:
id
名字
id1
1
一个
6
2
B
7
当我打印计数器变量时,它仍然是 0。
谁能帮我知道如何访问 UDF 中的全局变量并在每次调用 UDF 时更改全局变量? 或者这是否不可能?
我们可以创建一个自定义集合累加器来存储 ID。
class SetAccumulator(AccumulatorParam):
def zero(self, init_value: set()):
return init_value
def addInPlace(self, v1: set, v2: set):
return v1.union(v2)
初始化集合累加器,并在转换数据帧时从我们的 spark 作业 运行 所在的每个线程添加到累加器。参考-
#accumulator initialization
acc = spark.sparkContext.accumulator(set(), SetAccumulator())
schema2 = StructType([\
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
data2 = [(1, "A"), (2, "B")]
df = spark.createDataFrame(data = data2, schema = schema2)
#access accumulator as a global variable inside the udf
def myFunc(column):
global acc
int_set = set()
int_set.add(column)
acc += int_set
return column + 5
myFuncUDF = udf(myFunc, IntegerType())
我正在尝试从 python 中的 pyspark.sql.functions.udf
函数内部更改全局变量。但是,更改没有反映在全局变量中。
连同输出的可重现示例是:
counter = 0
schema2 = StructType([\
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
data2 = [(1, "A"), (2, "B")]
df = spark.createDataFrame(data = data2, schema = schema2)
def myFunc(column):
global counter
counter = counter + 1
return column + 5
myFuncUDF = udf(myFunc, IntegerType())
display(df.withColumn('id1', myFuncUDF(df.id)))
输出:
id | 名字 | id1 |
---|---|---|
1 | 一个 | 6 |
2 | B | 7 |
当我打印计数器变量时,它仍然是 0。
谁能帮我知道如何访问 UDF 中的全局变量并在每次调用 UDF 时更改全局变量? 或者这是否不可能?
我们可以创建一个自定义集合累加器来存储 ID。
class SetAccumulator(AccumulatorParam):
def zero(self, init_value: set()):
return init_value
def addInPlace(self, v1: set, v2: set):
return v1.union(v2)
初始化集合累加器,并在转换数据帧时从我们的 spark 作业 运行 所在的每个线程添加到累加器。参考-
#accumulator initialization
acc = spark.sparkContext.accumulator(set(), SetAccumulator())
schema2 = StructType([\
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
data2 = [(1, "A"), (2, "B")]
df = spark.createDataFrame(data = data2, schema = schema2)
#access accumulator as a global variable inside the udf
def myFunc(column):
global acc
int_set = set()
int_set.add(column)
acc += int_set
return column + 5
myFuncUDF = udf(myFunc, IntegerType())