如何根据字典对列值进行排序并保留最高值?

How to rank the column values based on a dictionary and retain the highest value?

假设我有一个数据框如下:

| id |col
| 1  | "A,B,C"
| 2  | "D,C"
| 3  | "B,C,A"
| 4  | None

字典是:

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

输出数据帧必须是:

| id |col
| 1  | "A"
| 2  | "C"
| 3  | "A"
| 4  | None

我假设您想根据字典中给定的值对字母进行排序 d

然后,您可以进行以下操作:

from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()

df = spark.createDataFrame(data=(Row(1, "A,B,C",),
                                 Row(2, "D,C",),
                                 Row(3, "B,C,A",),
                                 Row(4, None)),
                           schema="id:int, col:string")
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

# Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
sort_udf = F.udf(lambda array: sorted(array,
                                      key=lambda x: d[x]) if array is not None else None,
                 T.ArrayType(T.StringType()))
df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
df.show()

"""
+---+----+
| id| col|
+---+----+
|  1|   A|
|  2|   C|
|  3|   A|
|  4|null|
+---+----+
"""





Higher Order Functions - Transform可以根据字典对col中的元素进行排序,然后排序得到排序最低的元素。

from pyspark.sql import functions as F
from itertools import chain

data = [(1, "A,B,C",),
        (2, "D,C",),
        (3, "B,C,A",),
        (4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])

"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""

(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
  .withColumn("mapper", mapper) # Add mapping columing to the dataframe
  .withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
  .withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()

"""
+---+----+
| id| col|
+---+----+
|  1|   A|
|  2|   C|
|  3|   A|
|  4|null|
+---+----+
"""

这是另一种解决方案,结构排序作为@Nithish 的答案,但使用 arrays_zip and array_min 代替:

  1. 从字典创建权重数组(按键排序)
  2. 压缩权重数组,分割结果 col 排序
  3. 获取结构压缩数组的数组最小值
import pyspark.sql.functions as F

df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

result = df.withColumn(
    "col",
    F.array_min(
        F.arrays_zip(
            F.array(*[F.lit(d[x]) for x in sorted(d)]), 
            F.array_sort(F.split("col", ","))
        )
    )["1"]
)

result.show()
#+---+----+
#| id| col|
#+---+----+
#|  1|   A|
#|  2|   C|
#|  3|   A|
#|  4|null|
#+---+----+