Select 来自 UDF PySpark 中 MapType 列的值
Select values from MapType Column in UDF PySpark
我正在尝试从 UDF 函数的 PySpark 数据框中的 MapType 列中提取值。
下面是 PySpark 数据框:
+-----------+------------+-------------+
|CUSTOMER_ID|col_a |col_b |
+-----------+------------+-------------+
| 100 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 101 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 102 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 103 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 104 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 105 |{0.0 -> 1.0}| {0.2 -> 1.0}|
+-----------+------------+-------------+
df.printSchema()
# root
# |-- CUSTOMER_ID: integer (nullable = true)
# |-- col_a: map (nullable = true)
# | |-- key: float
# | |-- value: float (valueContainsNull = true)
# |-- col_b: map (nullable = true)
# | |-- key: float
# | |-- value: float (valueContainsNull = true)
下面是UDF
@F.udf(T.FloatType())
def test(col):
return col[1]
代码如下:
df_temp=df_temp.withColumn('test',test(F.col('col_a')))
当我将它传递给 UDF 时,我没有从 col_a 列中获取值。谁能解释一下?
这是因为你的地图在key=1处没有任何东西。
df_temp = spark.createDataFrame([(100,),(101,),(102,)],['CUSTOMER_ID']) \
.withColumn('col_a', F.create_map(F.lit(0.0), F.lit(1.0)))
df_temp.show()
# +-----------+------------+
# |CUSTOMER_ID| col_a|
# +-----------+------------+
# | 100|{0.0 -> 1.0}|
# | 101|{0.0 -> 1.0}|
# | 102|{0.0 -> 1.0}|
# +-----------+------------+
df_temp = df_temp.withColumn('col_a_0', F.col('col_a')[0])
df_temp = df_temp.withColumn('col_a_1', F.col('col_a')[1])
df_temp.show()
# +-----------+------------+-------+-------+
# |CUSTOMER_ID| col_a|col_a_0|col_a_1|
# +-----------+------------+-------+-------+
# | 100|{0.0 -> 1.0}| 1.0| null|
# | 101|{0.0 -> 1.0}| 1.0| null|
# | 102|{0.0 -> 1.0}| 1.0| null|
# +-----------+------------+-------+-------+
要提取 maptype 列中的值,请使用 map_values()
df_temp.withColumn('col_a_1', array_join(map_values("col_a"),',')).show()
+-----------+------------+-------+
|CUSTOMER_ID| col_a|col_a_1|
+-----------+------------+-------+
| 100|{0.0 -> 1.0}| 1.0|
| 101|{0.0 -> 1.0}| 1.0|
| 102|{0.0 -> 1.0}| 1.0|
+-----------+------------+-------+
我正在尝试从 UDF 函数的 PySpark 数据框中的 MapType 列中提取值。
下面是 PySpark 数据框:
+-----------+------------+-------------+
|CUSTOMER_ID|col_a |col_b |
+-----------+------------+-------------+
| 100 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 101 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 102 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 103 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 104 |{0.0 -> 1.0}| {0.2 -> 1.0}|
| 105 |{0.0 -> 1.0}| {0.2 -> 1.0}|
+-----------+------------+-------------+
df.printSchema()
# root
# |-- CUSTOMER_ID: integer (nullable = true)
# |-- col_a: map (nullable = true)
# | |-- key: float
# | |-- value: float (valueContainsNull = true)
# |-- col_b: map (nullable = true)
# | |-- key: float
# | |-- value: float (valueContainsNull = true)
下面是UDF
@F.udf(T.FloatType())
def test(col):
return col[1]
代码如下:
df_temp=df_temp.withColumn('test',test(F.col('col_a')))
当我将它传递给 UDF 时,我没有从 col_a 列中获取值。谁能解释一下?
这是因为你的地图在key=1处没有任何东西。
df_temp = spark.createDataFrame([(100,),(101,),(102,)],['CUSTOMER_ID']) \
.withColumn('col_a', F.create_map(F.lit(0.0), F.lit(1.0)))
df_temp.show()
# +-----------+------------+
# |CUSTOMER_ID| col_a|
# +-----------+------------+
# | 100|{0.0 -> 1.0}|
# | 101|{0.0 -> 1.0}|
# | 102|{0.0 -> 1.0}|
# +-----------+------------+
df_temp = df_temp.withColumn('col_a_0', F.col('col_a')[0])
df_temp = df_temp.withColumn('col_a_1', F.col('col_a')[1])
df_temp.show()
# +-----------+------------+-------+-------+
# |CUSTOMER_ID| col_a|col_a_0|col_a_1|
# +-----------+------------+-------+-------+
# | 100|{0.0 -> 1.0}| 1.0| null|
# | 101|{0.0 -> 1.0}| 1.0| null|
# | 102|{0.0 -> 1.0}| 1.0| null|
# +-----------+------------+-------+-------+
要提取 maptype 列中的值,请使用 map_values()
df_temp.withColumn('col_a_1', array_join(map_values("col_a"),',')).show()
+-----------+------------+-------+
|CUSTOMER_ID| col_a|col_a_1|
+-----------+------------+-------+
| 100|{0.0 -> 1.0}| 1.0|
| 101|{0.0 -> 1.0}| 1.0|
| 102|{0.0 -> 1.0}| 1.0|
+-----------+------------+-------+