案例明智地使用列映射来填充 pyspark 数据框中另一列的值

Case wise using mapping from columns to fill value in another column in a pyspark dataframe


|       col1|       col2|       col3|
|         s1|         c1|         p3|
|         s2|         c1|         p3|
|         s1|         c3|         p3|
|         s3|         c4|         p4|
|         s4|         c5|         p4|
|         s2|         c6|         p4|

现在我想要实现的是,我想通过使用比方说 dict 从多列映射创建一个新列(因为唯一值的数量很大,单个或 case 语句会很乏味)。 这个想法是首先映射 col1 的值,然后如果新列中有剩余的空值,则从 col2 映射它们,然后如果有更多的空值,则从 col3 映射它们,最后将剩余的空值映射到替换为 str 文字。:

col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}


|       col1|       col2|       col3|       col4|
|         s1|         c1|         p3|      apple|
|         s2|         c1|         p3|     potato|
|         s1|         c3|         p3|      apple|
|         s3|         c4|         p4|     orange|
|         s4|         c5|         p4|        bat|
|         s2|         c6|         p4|     tomato|


from itertools import chain
from pyspark.sql.functions import create_map, lit

mapping_expr = create_map([lit(x) for x in chain(*col1_dict.items())])

df = df.withColumn('col4', mapping_expr[df['col4']])

这将从 col1 的映射中获取 col4 中的值。但是我的问题是,如果我对 col2 重复此操作,并且 col4 中已经有来自 col1 的映射值,则新映射将替换它。我不要那个。 有没有人建议在新列中保持这种添加值的顺序?


from pyspark.sql.functions import col, create_map, lit, when
from itertools import chain
values = [('s1','c1','p3'),('s2','c1','p3'),('s1','c3','p3'),('s3','c4','p4'),('s4','c5','p4'),('s2','c6','p4')]
df = sqlContext.createDataFrame(values,['col1','col2','col3'])
|  s1|  c1|  p3|
|  s2|  c1|  p3|
|  s1|  c3|  p3|
|  s3|  c4|  p4|
|  s4|  c5|  p4|
|  s2|  c6|  p4|


col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}

#Applying the mapping of dictionary.
mapping_expr1 = create_map([lit(x) for x in chain(*col1_map.items())])
mapping_expr2 = create_map([lit(x) for x in chain(*col2_map.items())])
mapping_expr3 = create_map([lit(x) for x in chain(*col3_map.items())])

终于应用了 create_map() in succession. All I am doing in addition, is checking if after operating on col1/col2 we still have null, which can be checked using isNull() 功能。

df=df.withColumn('col4', mapping_expr1.getItem(col('col1')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr2.getItem(col('col2'))).otherwise(col('col4')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr3.getItem(col('col3'))).otherwise(col('col4')))
|col1|col2|col3|  col4|
|  s1|  c1|  p3| apple|
|  s2|  c1|  p3|potato|
|  s1|  c3|  p3| apple|
|  s3|  c4|  p4|orange|
|  s4|  c5|  p4|   bat|
|  s2|  c6|  p4|tomato|