MapType 列值上的 PySpark 杠杆函数

PySpark Leverage Function on MapType Column Values

下面是一个数据框,代表了我要完成的任务。不过请注意,我要利用的函数比这个例子要复杂一些。

import pyspark
from pyspark.sql import SparkSession

arrayData = [
        ('1',{1:100,2:200}),
        ('1',{1:100,2:None})]

df=spark.createDataFrame(data=arrayData, schema = ['id','value'])

我想做的是利用 withColumn 创建一个新列,其中包含应用了函数的新地图类型对象。

假设我想对每个值求平方。我知道我可以创建一个将值乘以 2 的 udf 并使用 withColumn ...但是,这似乎不适用于应用到 MapType。

我要实现的输出是:

arrayData = [
        ('1',{1:200,2:200},{1:400,2:400}),
        ('1',{1:100,2:None},{1:200,2:None})]

df=spark.createDataFrame(data=arrayData, schema = ['id','value','newCol'])

最后,我需要保持并行性并试图避免爆炸,因为我想将它保持在一行中。如何实现?

不确定为什么它对你不起作用,但这个 UDF 对我来说非常好

import pyspark.sql.functions as F

def sq(m):
    return {k:e**2 if e is not None else None for k,e in m.items()}

print(sq({1:100,2:200}))  # {1: 10000, 2: 40000}
print(sq({1:100,2:None})) # {1: 10000, 2: None}

(df
    .withColumn('newCol', F.udf(sq, T.MapType(T.LongType(), T.LongType()))('value'))
    .show(10, False)
)

# Output
# +---+---------------------+------------------------+
# |id |value                |newCol                  |
# +---+---------------------+------------------------+
# |1  |{1 -> 100, 2 -> 200} |{1 -> 10000, 2 -> 40000}|
# |1  |{1 -> 100, 2 -> null}|{1 -> 10000, 2 -> null} |
# +---+---------------------+------------------------+