查询 withColumn Pyspark 以添加基于数组的列数据框

Query withColumn Pyspark to add a column dataframe based on array

我有一个数据框:

   "people"      "other"
   father         ...
   mother
   cat
   brother
   dog

我要插入新列,如果第people列包含特定数组中的单词,则用其他数组的内容修改内容,否则单词将保持不变:

array=['father','mother','brother']
array_new=['dad','mum','bro']

   "people"      "other"     "new"
   father         ...        dad
   mother                    mum
   cat                       cat
   brother                   bro
   dog                       dog

我想用这个:

expression = ("CASE " +"".join(["WHEN people LIKE '{}' THEN '{}' ".format(val,array_new[array.index(val)](val)) for val in array]) +"ELSE 'None' END")

df_pyspark = df_pyspark.withColumn("new", functions.expr(expression))

我应该更改 else 条件,但我不知道该怎么做,因为在 else 条件下复制相同的单词。

您使用从 arrayarray_new 使用 create_map 函数创建的文字映射表达式:

from pyspark.sql import functions as F
from itertools import chain

mapping = F.create_map(*[F.lit(x) for x in chain(*zip(array, array_new))])

df1 = df.withColumn("new", F.coalesce(mapping[F.col("people")], F.col("people")))

或者使用 na.replace 传递一个从两个数组创建的字典,如下所示:

from pyspark.sql import functions as F

mapping_dict = dict(zip(array, array_new))

df1 = df.withColumn("new", F.col("people")).na.replace(mapping_dict, subset=["new"])

通过链接多个 when 表达式的另一种方法:

from functools import reduce
from pyspark.sql import functions as F

when_expr = reduce(
    lambda acc, x: acc.when(F.col("people") == x[0], x[1]),
    zip(array, array_new),
    F
).otherwise(F.col("people"))

df1 = df.withColumn("new", when_expr)