通过从其他数据框获取数据查找将新列添加到 spark 数据框
Adding new column to spark dataframe by getting data lookup from other dataframe
我正在尝试使用 pyspark 连接 2 个数据帧,其中数据帧 1 具有来自查找数据帧的多条数据记录。
>>> df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
>>> df.show()
+---+--+------+-------------------+
| col1| col2| error_cloumn |
+---+--+------+-------------------+
| 1 | 4| date_from,date_to|
| 1 | 8| emp_name |
+---+--+------+-------------------+
>>> look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
>>> look_up_df.show()
+---+--+------+--------------------------+
| 'column_nm'|'clmn1' |'comment' |
+---+--+------+--------------------------+
|'date_from' |'DD-MM-YY' | 'text msg1'|
| 'date_to' |'DD-MM-YY' | 'test msg2'|
| 'emp_name' |'VARCHAR(100)'| 'test msg3'|
| 'emp_type' |'VARCHAR(100)'| 'test msg4'|
+---+--+------+--------------------------+
Expected output : error_desc as look_up_df[column_nm] + lit('expected') + look_up_df[clmn1]+ lit('and comment is') + look_up_df[comment]
output_df:
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
| col1| col2| error_cloumn | error_desc |
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
| 1 | 4| date_from,date_to|date_from expected as DD-MM-YY and comment is text msg1, date_to expected as DD-MM-YY and comment is text msg2 |
| 1 | 8| emp_name |emp_name should be VARCHAR(100) and comment is test msg3 |
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
我正在尝试使用打击代码:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
df.show()
look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
look_up_df.show()
output_df = df.join(look_up_df, df["error_cloumn"] == look_up_df["column_nm"]).withColumn("error_desc",F.concat(F.Col('column_nm'),F.lit(' expected as '),F.Col('clmn1').lit(' and comment is '),.Col('comment'),))
此代码适用于一条记录,但无法用于多个列,例如 date_from、date_to in records
我建议在 df
中拆分 error_cloumn
,分解(都包含在 pyspark.sql.functions
模块中)然后加入。您将获得多行,并按 col1, col2
分组(假设这些可能是分组键),您可以按照结果中显示的方式聚合文本。
如果您需要更多支持,请告诉我
from pyspark.sql import functions as F
from pyspark.sql.functions import concat, lit, expr, when, explode, split, collect_list, concat_ws
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_column'])
df = df.withColumn('all_values',explode(split('error_column',',')))
df.show()
look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text_msg1'),('date_to', 'DD-MM-YY', 'test_msg2'),('emp_name', 'VARCHAR(100)', 'test_msg3'),('emp_type', 'VARCHAR(100)', 'test_msg4')], ['column_name', 'clmn1', 'comment'])
look_up_df = (
look_up_df.withColumn('msg',
when(look_up_df.column_name.like('%date%'),concat(look_up_df.column_name,lit(' expected as '),look_up_df.clmn1, lit(' and comment is '), look_up_df.comment))
.otherwise(concat(look_up_df.column_name,lit(' should be '),look_up_df.clmn1, lit(' and comment is '), look_up_df.comment)) )
)
display(look_up_df)
result = df.join(look_up_df,df.all_values == look_up_df.column_name , how = 'inner').groupby('col1','col2', 'error_column').agg(collect_list('msg').alias('final_msg')).withColumn('final_msg2',concat_ws(',','final_msg'))
display(result)
好吧,您要找的东西非常简单,只有多个步骤,如果您写得不正确,可能会有点混乱。您可能会发现这个答案基本上与另一个答案相似,但结构更好。我在每个步骤上都添加了评论,但请随意 运行 每个步骤都遵循逻辑。
from pyspark.sql import functions as F
(df
.withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
.join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
.withColumn('error_desc', F.concat( # concat as your requirement
F.col('error_column'),
F.lit(' expected '),
F.col('clmn1'),
F.lit(' and comment is '),
F.col('comment'),
))
.groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
.agg(
F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
)
.show(10, False)
)
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|col1|col2|error_column |error_desc |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|1 |4 |date_from, date_to|date_from expected DD-MM-YY and comment is text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1 |8 |emp_name |emp_name expected VARCHAR(100) and comment is test msg3 |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
我正在尝试使用 pyspark 连接 2 个数据帧,其中数据帧 1 具有来自查找数据帧的多条数据记录。
>>> df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
>>> df.show()
+---+--+------+-------------------+
| col1| col2| error_cloumn |
+---+--+------+-------------------+
| 1 | 4| date_from,date_to|
| 1 | 8| emp_name |
+---+--+------+-------------------+
>>> look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
>>> look_up_df.show()
+---+--+------+--------------------------+
| 'column_nm'|'clmn1' |'comment' |
+---+--+------+--------------------------+
|'date_from' |'DD-MM-YY' | 'text msg1'|
| 'date_to' |'DD-MM-YY' | 'test msg2'|
| 'emp_name' |'VARCHAR(100)'| 'test msg3'|
| 'emp_type' |'VARCHAR(100)'| 'test msg4'|
+---+--+------+--------------------------+
Expected output : error_desc as look_up_df[column_nm] + lit('expected') + look_up_df[clmn1]+ lit('and comment is') + look_up_df[comment]
output_df:
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
| col1| col2| error_cloumn | error_desc |
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
| 1 | 4| date_from,date_to|date_from expected as DD-MM-YY and comment is text msg1, date_to expected as DD-MM-YY and comment is text msg2 |
| 1 | 8| emp_name |emp_name should be VARCHAR(100) and comment is test msg3 |
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
我正在尝试使用打击代码:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
df.show()
look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
look_up_df.show()
output_df = df.join(look_up_df, df["error_cloumn"] == look_up_df["column_nm"]).withColumn("error_desc",F.concat(F.Col('column_nm'),F.lit(' expected as '),F.Col('clmn1').lit(' and comment is '),.Col('comment'),))
此代码适用于一条记录,但无法用于多个列,例如 date_from、date_to in records
我建议在 df
中拆分 error_cloumn
,分解(都包含在 pyspark.sql.functions
模块中)然后加入。您将获得多行,并按 col1, col2
分组(假设这些可能是分组键),您可以按照结果中显示的方式聚合文本。
如果您需要更多支持,请告诉我
from pyspark.sql import functions as F
from pyspark.sql.functions import concat, lit, expr, when, explode, split, collect_list, concat_ws
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_column'])
df = df.withColumn('all_values',explode(split('error_column',',')))
df.show()
look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text_msg1'),('date_to', 'DD-MM-YY', 'test_msg2'),('emp_name', 'VARCHAR(100)', 'test_msg3'),('emp_type', 'VARCHAR(100)', 'test_msg4')], ['column_name', 'clmn1', 'comment'])
look_up_df = (
look_up_df.withColumn('msg',
when(look_up_df.column_name.like('%date%'),concat(look_up_df.column_name,lit(' expected as '),look_up_df.clmn1, lit(' and comment is '), look_up_df.comment))
.otherwise(concat(look_up_df.column_name,lit(' should be '),look_up_df.clmn1, lit(' and comment is '), look_up_df.comment)) )
)
display(look_up_df)
result = df.join(look_up_df,df.all_values == look_up_df.column_name , how = 'inner').groupby('col1','col2', 'error_column').agg(collect_list('msg').alias('final_msg')).withColumn('final_msg2',concat_ws(',','final_msg'))
display(result)
好吧,您要找的东西非常简单,只有多个步骤,如果您写得不正确,可能会有点混乱。您可能会发现这个答案基本上与另一个答案相似,但结构更好。我在每个步骤上都添加了评论,但请随意 运行 每个步骤都遵循逻辑。
from pyspark.sql import functions as F
(df
.withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
.join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
.withColumn('error_desc', F.concat( # concat as your requirement
F.col('error_column'),
F.lit(' expected '),
F.col('clmn1'),
F.lit(' and comment is '),
F.col('comment'),
))
.groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
.agg(
F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
)
.show(10, False)
)
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|col1|col2|error_column |error_desc |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|1 |4 |date_from, date_to|date_from expected DD-MM-YY and comment is text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1 |8 |emp_name |emp_name expected VARCHAR(100) and comment is test msg3 |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+