查询 withColumn Pyspark 以添加基于数组的列数据框
Query withColumn Pyspark to add a column dataframe based on array
我有一个数据框:
"people" "other"
father ...
mother
cat
brother
dog
我要插入新列,如果第people
列包含特定数组中的单词,则用其他数组的内容修改内容,否则单词将保持不变:
array=['father','mother','brother']
array_new=['dad','mum','bro']
"people" "other" "new"
father ... dad
mother mum
cat cat
brother bro
dog dog
我想用这个:
expression = ("CASE " +"".join(["WHEN people LIKE '{}' THEN '{}' ".format(val,array_new[array.index(val)](val)) for val in array]) +"ELSE 'None' END")
df_pyspark = df_pyspark.withColumn("new", functions.expr(expression))
我应该更改 else
条件,但我不知道该怎么做,因为在 else 条件下复制相同的单词。
您使用从 array
和 array_new
使用 create_map
函数创建的文字映射表达式:
from pyspark.sql import functions as F
from itertools import chain
mapping = F.create_map(*[F.lit(x) for x in chain(*zip(array, array_new))])
df1 = df.withColumn("new", F.coalesce(mapping[F.col("people")], F.col("people")))
或者使用 na.replace
传递一个从两个数组创建的字典,如下所示:
from pyspark.sql import functions as F
mapping_dict = dict(zip(array, array_new))
df1 = df.withColumn("new", F.col("people")).na.replace(mapping_dict, subset=["new"])
通过链接多个 when
表达式的另一种方法:
from functools import reduce
from pyspark.sql import functions as F
when_expr = reduce(
lambda acc, x: acc.when(F.col("people") == x[0], x[1]),
zip(array, array_new),
F
).otherwise(F.col("people"))
df1 = df.withColumn("new", when_expr)
我有一个数据框:
"people" "other"
father ...
mother
cat
brother
dog
我要插入新列,如果第people
列包含特定数组中的单词,则用其他数组的内容修改内容,否则单词将保持不变:
array=['father','mother','brother']
array_new=['dad','mum','bro']
"people" "other" "new"
father ... dad
mother mum
cat cat
brother bro
dog dog
我想用这个:
expression = ("CASE " +"".join(["WHEN people LIKE '{}' THEN '{}' ".format(val,array_new[array.index(val)](val)) for val in array]) +"ELSE 'None' END")
df_pyspark = df_pyspark.withColumn("new", functions.expr(expression))
我应该更改 else
条件,但我不知道该怎么做,因为在 else 条件下复制相同的单词。
您使用从 array
和 array_new
使用 create_map
函数创建的文字映射表达式:
from pyspark.sql import functions as F
from itertools import chain
mapping = F.create_map(*[F.lit(x) for x in chain(*zip(array, array_new))])
df1 = df.withColumn("new", F.coalesce(mapping[F.col("people")], F.col("people")))
或者使用 na.replace
传递一个从两个数组创建的字典,如下所示:
from pyspark.sql import functions as F
mapping_dict = dict(zip(array, array_new))
df1 = df.withColumn("new", F.col("people")).na.replace(mapping_dict, subset=["new"])
通过链接多个 when
表达式的另一种方法:
from functools import reduce
from pyspark.sql import functions as F
when_expr = reduce(
lambda acc, x: acc.when(F.col("people") == x[0], x[1]),
zip(array, array_new),
F
).otherwise(F.col("people"))
df1 = df.withColumn("new", when_expr)