案例明智地使用列映射来填充 pyspark 数据框中另一列的值
Case wise using mapping from columns to fill value in another column in a pyspark dataframe
我有一个包含多列的数据框:
+-----------+-----------+-----------+
| col1| col2| col3|
+-----------+-----------+-----------+
| s1| c1| p3|
| s2| c1| p3|
| s1| c3| p3|
| s3| c4| p4|
| s4| c5| p4|
| s2| c6| p4|
+-----------+-----------+-----------+
现在我想要实现的是,我想通过使用比方说 dict 从多列映射创建一个新列(因为唯一值的数量很大,单个或 case 语句会很乏味)。
这个想法是首先映射 col1 的值,然后如果新列中有剩余的空值,则从 col2 映射它们,然后如果有更多的空值,则从 col3 映射它们,最后将剩余的空值映射到替换为 str 文字。:
col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}
最终输出如下所示:
+-----------+-----------+-----------+-----------+
| col1| col2| col3| col4|
+-----------+-----------+-----------+-----------+
| s1| c1| p3| apple|
| s2| c1| p3| potato|
| s1| c3| p3| apple|
| s3| c4| p4| orange|
| s4| c5| p4| bat|
| s2| c6| p4| tomato|
+-----------+-----------+-----------+-----------+
到目前为止,我的方法是创建一个新列。然后到
from itertools import chain
from pyspark.sql.functions import create_map, lit
mapping_expr = create_map([lit(x) for x in chain(*col1_dict.items())])
df = df.withColumn('col4', mapping_expr[df['col4']])
这将从 col1 的映射中获取 col4 中的值。但是我的问题是,如果我对 col2 重复此操作,并且 col4 中已经有来自 col1 的映射值,则新映射将替换它。我不要那个。
有没有人建议在新列中保持这种添加值的顺序?
你做的几乎是对的,只是你需要连续使用mapping_expr
。
from pyspark.sql.functions import col, create_map, lit, when
from itertools import chain
values = [('s1','c1','p3'),('s2','c1','p3'),('s1','c3','p3'),('s3','c4','p4'),('s4','c5','p4'),('s2','c6','p4')]
df = sqlContext.createDataFrame(values,['col1','col2','col3'])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| s1| c1| p3|
| s2| c1| p3|
| s1| c3| p3|
| s3| c4| p4|
| s4| c5| p4|
| s2| c6| p4|
+----+----+----+
字典,由您提供并创建它的映射
col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}
#Applying the mapping of dictionary.
mapping_expr1 = create_map([lit(x) for x in chain(*col1_map.items())])
mapping_expr2 = create_map([lit(x) for x in chain(*col2_map.items())])
mapping_expr3 = create_map([lit(x) for x in chain(*col3_map.items())])
终于应用了 create_map()
in succession. All I am doing in addition, is checking if after operating on col1/col2
we still have null, which can be checked using isNull()
功能。
df=df.withColumn('col4', mapping_expr1.getItem(col('col1')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr2.getItem(col('col2'))).otherwise(col('col4')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr3.getItem(col('col3'))).otherwise(col('col4')))
df.show()
+----+----+----+------+
|col1|col2|col3| col4|
+----+----+----+------+
| s1| c1| p3| apple|
| s2| c1| p3|potato|
| s1| c3| p3| apple|
| s3| c4| p4|orange|
| s4| c5| p4| bat|
| s2| c6| p4|tomato|
+----+----+----+------+
我有一个包含多列的数据框:
+-----------+-----------+-----------+
| col1| col2| col3|
+-----------+-----------+-----------+
| s1| c1| p3|
| s2| c1| p3|
| s1| c3| p3|
| s3| c4| p4|
| s4| c5| p4|
| s2| c6| p4|
+-----------+-----------+-----------+
现在我想要实现的是,我想通过使用比方说 dict 从多列映射创建一个新列(因为唯一值的数量很大,单个或 case 语句会很乏味)。 这个想法是首先映射 col1 的值,然后如果新列中有剩余的空值,则从 col2 映射它们,然后如果有更多的空值,则从 col3 映射它们,最后将剩余的空值映射到替换为 str 文字。:
col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}
最终输出如下所示:
+-----------+-----------+-----------+-----------+
| col1| col2| col3| col4|
+-----------+-----------+-----------+-----------+
| s1| c1| p3| apple|
| s2| c1| p3| potato|
| s1| c3| p3| apple|
| s3| c4| p4| orange|
| s4| c5| p4| bat|
| s2| c6| p4| tomato|
+-----------+-----------+-----------+-----------+
到目前为止,我的方法是创建一个新列。然后到
from itertools import chain
from pyspark.sql.functions import create_map, lit
mapping_expr = create_map([lit(x) for x in chain(*col1_dict.items())])
df = df.withColumn('col4', mapping_expr[df['col4']])
这将从 col1 的映射中获取 col4 中的值。但是我的问题是,如果我对 col2 重复此操作,并且 col4 中已经有来自 col1 的映射值,则新映射将替换它。我不要那个。 有没有人建议在新列中保持这种添加值的顺序?
你做的几乎是对的,只是你需要连续使用mapping_expr
。
from pyspark.sql.functions import col, create_map, lit, when
from itertools import chain
values = [('s1','c1','p3'),('s2','c1','p3'),('s1','c3','p3'),('s3','c4','p4'),('s4','c5','p4'),('s2','c6','p4')]
df = sqlContext.createDataFrame(values,['col1','col2','col3'])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| s1| c1| p3|
| s2| c1| p3|
| s1| c3| p3|
| s3| c4| p4|
| s4| c5| p4|
| s2| c6| p4|
+----+----+----+
字典,由您提供并创建它的映射
col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}
#Applying the mapping of dictionary.
mapping_expr1 = create_map([lit(x) for x in chain(*col1_map.items())])
mapping_expr2 = create_map([lit(x) for x in chain(*col2_map.items())])
mapping_expr3 = create_map([lit(x) for x in chain(*col3_map.items())])
终于应用了 create_map()
in succession. All I am doing in addition, is checking if after operating on col1/col2
we still have null, which can be checked using isNull()
功能。
df=df.withColumn('col4', mapping_expr1.getItem(col('col1')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr2.getItem(col('col2'))).otherwise(col('col4')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr3.getItem(col('col3'))).otherwise(col('col4')))
df.show()
+----+----+----+------+
|col1|col2|col3| col4|
+----+----+----+------+
| s1| c1| p3| apple|
| s2| c1| p3|potato|
| s1| c3| p3| apple|
| s3| c4| p4|orange|
| s4| c5| p4| bat|
| s2| c6| p4|tomato|
+----+----+----+------+