MapType 列值上的 PySpark 杠杆函数
PySpark Leverage Function on MapType Column Values
下面是一个数据框,代表了我要完成的任务。不过请注意,我要利用的函数比这个例子要复杂一些。
import pyspark
from pyspark.sql import SparkSession
arrayData = [
('1',{1:100,2:200}),
('1',{1:100,2:None})]
df=spark.createDataFrame(data=arrayData, schema = ['id','value'])
我想做的是利用 withColumn 创建一个新列,其中包含应用了函数的新地图类型对象。
假设我想对每个值求平方。我知道我可以创建一个将值乘以 2 的 udf 并使用 withColumn ...但是,这似乎不适用于应用到 MapType。
我要实现的输出是:
arrayData = [
('1',{1:200,2:200},{1:400,2:400}),
('1',{1:100,2:None},{1:200,2:None})]
df=spark.createDataFrame(data=arrayData, schema = ['id','value','newCol'])
最后,我需要保持并行性并试图避免爆炸,因为我想将它保持在一行中。如何实现?
不确定为什么它对你不起作用,但这个 UDF 对我来说非常好
import pyspark.sql.functions as F
def sq(m):
return {k:e**2 if e is not None else None for k,e in m.items()}
print(sq({1:100,2:200})) # {1: 10000, 2: 40000}
print(sq({1:100,2:None})) # {1: 10000, 2: None}
(df
.withColumn('newCol', F.udf(sq, T.MapType(T.LongType(), T.LongType()))('value'))
.show(10, False)
)
# Output
# +---+---------------------+------------------------+
# |id |value |newCol |
# +---+---------------------+------------------------+
# |1 |{1 -> 100, 2 -> 200} |{1 -> 10000, 2 -> 40000}|
# |1 |{1 -> 100, 2 -> null}|{1 -> 10000, 2 -> null} |
# +---+---------------------+------------------------+
下面是一个数据框,代表了我要完成的任务。不过请注意,我要利用的函数比这个例子要复杂一些。
import pyspark
from pyspark.sql import SparkSession
arrayData = [
('1',{1:100,2:200}),
('1',{1:100,2:None})]
df=spark.createDataFrame(data=arrayData, schema = ['id','value'])
我想做的是利用 withColumn 创建一个新列,其中包含应用了函数的新地图类型对象。
假设我想对每个值求平方。我知道我可以创建一个将值乘以 2 的 udf 并使用 withColumn ...但是,这似乎不适用于应用到 MapType。
我要实现的输出是:
arrayData = [
('1',{1:200,2:200},{1:400,2:400}),
('1',{1:100,2:None},{1:200,2:None})]
df=spark.createDataFrame(data=arrayData, schema = ['id','value','newCol'])
最后,我需要保持并行性并试图避免爆炸,因为我想将它保持在一行中。如何实现?
不确定为什么它对你不起作用,但这个 UDF 对我来说非常好
import pyspark.sql.functions as F
def sq(m):
return {k:e**2 if e is not None else None for k,e in m.items()}
print(sq({1:100,2:200})) # {1: 10000, 2: 40000}
print(sq({1:100,2:None})) # {1: 10000, 2: None}
(df
.withColumn('newCol', F.udf(sq, T.MapType(T.LongType(), T.LongType()))('value'))
.show(10, False)
)
# Output
# +---+---------------------+------------------------+
# |id |value |newCol |
# +---+---------------------+------------------------+
# |1 |{1 -> 100, 2 -> 200} |{1 -> 10000, 2 -> 40000}|
# |1 |{1 -> 100, 2 -> null}|{1 -> 10000, 2 -> null} |
# +---+---------------------+------------------------+