如何根据字典对列值进行排序并保留最高值?
How to rank the column values based on a dictionary and retain the highest value?
假设我有一个数据框如下:
| id |col
| 1 | "A,B,C"
| 2 | "D,C"
| 3 | "B,C,A"
| 4 | None
字典是:
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
输出数据帧必须是:
| id |col
| 1 | "A"
| 2 | "C"
| 3 | "A"
| 4 | None
我假设您想根据字典中给定的值对字母进行排序 d
。
然后,您可以进行以下操作:
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()
df = spark.createDataFrame(data=(Row(1, "A,B,C",),
Row(2, "D,C",),
Row(3, "B,C,A",),
Row(4, None)),
schema="id:int, col:string")
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
# Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
sort_udf = F.udf(lambda array: sorted(array,
key=lambda x: d[x]) if array is not None else None,
T.ArrayType(T.StringType()))
df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
df.show()
"""
+---+----+
| id| col|
+---+----+
| 1| A|
| 2| C|
| 3| A|
| 4|null|
+---+----+
"""
Higher Order Functions - Transform
可以根据字典对col
中的元素进行排序,然后排序得到排序最低的元素。
from pyspark.sql import functions as F
from itertools import chain
data = [(1, "A,B,C",),
(2, "D,C",),
(3, "B,C,A",),
(4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])
"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""
(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
.withColumn("mapper", mapper) # Add mapping columing to the dataframe
.withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
.withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()
"""
+---+----+
| id| col|
+---+----+
| 1| A|
| 2| C|
| 3| A|
| 4|null|
+---+----+
"""
这是另一种解决方案,结构排序作为@Nithish 的答案,但使用 arrays_zip
and array_min
代替:
- 从字典创建权重数组(按键排序)
- 压缩权重数组,分割结果
col
排序
- 获取结构压缩数组的数组最小值
import pyspark.sql.functions as F
df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
result = df.withColumn(
"col",
F.array_min(
F.arrays_zip(
F.array(*[F.lit(d[x]) for x in sorted(d)]),
F.array_sort(F.split("col", ","))
)
)["1"]
)
result.show()
#+---+----+
#| id| col|
#+---+----+
#| 1| A|
#| 2| C|
#| 3| A|
#| 4|null|
#+---+----+
假设我有一个数据框如下:
| id |col
| 1 | "A,B,C"
| 2 | "D,C"
| 3 | "B,C,A"
| 4 | None
字典是:
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
输出数据帧必须是:
| id |col
| 1 | "A"
| 2 | "C"
| 3 | "A"
| 4 | None
我假设您想根据字典中给定的值对字母进行排序 d
。
然后,您可以进行以下操作:
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()
df = spark.createDataFrame(data=(Row(1, "A,B,C",),
Row(2, "D,C",),
Row(3, "B,C,A",),
Row(4, None)),
schema="id:int, col:string")
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
# Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
sort_udf = F.udf(lambda array: sorted(array,
key=lambda x: d[x]) if array is not None else None,
T.ArrayType(T.StringType()))
df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
df.show()
"""
+---+----+
| id| col|
+---+----+
| 1| A|
| 2| C|
| 3| A|
| 4|null|
+---+----+
"""
Higher Order Functions - Transform
可以根据字典对col
中的元素进行排序,然后排序得到排序最低的元素。
from pyspark.sql import functions as F
from itertools import chain
data = [(1, "A,B,C",),
(2, "D,C",),
(3, "B,C,A",),
(4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])
"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""
(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
.withColumn("mapper", mapper) # Add mapping columing to the dataframe
.withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
.withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()
"""
+---+----+
| id| col|
+---+----+
| 1| A|
| 2| C|
| 3| A|
| 4|null|
+---+----+
"""
这是另一种解决方案,结构排序作为@Nithish 的答案,但使用 arrays_zip
and array_min
代替:
- 从字典创建权重数组(按键排序)
- 压缩权重数组,分割结果
col
排序 - 获取结构压缩数组的数组最小值
import pyspark.sql.functions as F
df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
result = df.withColumn(
"col",
F.array_min(
F.arrays_zip(
F.array(*[F.lit(d[x]) for x in sorted(d)]),
F.array_sort(F.split("col", ","))
)
)["1"]
)
result.show()
#+---+----+
#| id| col|
#+---+----+
#| 1| A|
#| 2| C|
#| 3| A|
#| 4|null|
#+---+----+