通过从其他数据框获取数据查找将新列添加到 spark 数据框

Adding new column to spark dataframe by getting data lookup from other dataframe

我正在尝试使用 pyspark 连接 2 个数据帧,其中数据帧 1 具有来自查找数据帧的多条数据记录。

>>> df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
>>> df.show()

+---+--+------+-------------------+
|  col1|  col2|  error_cloumn     |
+---+--+------+-------------------+
|  1   |     4|  date_from,date_to|
|  1   |     8|       emp_name    |
+---+--+------+-------------------+


>>> look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
>>> look_up_df.show()

+---+--+------+--------------------------+
| 'column_nm'|'clmn1'       |'comment'     |
+---+--+------+--------------------------+
|'date_from' |'DD-MM-YY'    | 'text msg1'|
| 'date_to'  |'DD-MM-YY'    | 'test msg2'|
| 'emp_name' |'VARCHAR(100)'| 'test msg3'|
| 'emp_type' |'VARCHAR(100)'| 'test msg4'|
+---+--+------+--------------------------+


Expected output : error_desc as look_up_df[column_nm] + lit('expected') + look_up_df[clmn1]+ lit('and comment is') + look_up_df[comment]
output_df:
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
|  col1|  col2|  error_cloumn     | error_desc                                                                                                      |
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-
|  1   |     4|  date_from,date_to|date_from expected as DD-MM-YY  and comment is text msg1, date_to expected as DD-MM-YY  and comment is text msg2 |
|  1   |     8|       emp_name    |emp_name should be VARCHAR(100) and comment is test msg3                                                         |
+---+--+------+-------------------+-----------------------------------------------------------------------------------------------------------------+-

我正在尝试使用打击代码:

from pyspark.sql import functions as F
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
df.show()

look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
look_up_df.show()

output_df = df.join(look_up_df, df["error_cloumn"] == look_up_df["column_nm"]).withColumn("error_desc",F.concat(F.Col('column_nm'),F.lit(' expected as '),F.Col('clmn1').lit(' and comment is '),.Col('comment'),))

此代码适用于一条记录,但无法用于多个列,例如 date_from、date_to in records

我建议在 df 中拆分 error_cloumn,分解(都包含在 pyspark.sql.functions 模块中)然后加入。您将获得多行,并按 col1, col2 分组(假设这些可能是分组键),您可以按照结果中显示的方式聚合文本。

如果您需要更多支持,请告诉我

from pyspark.sql import functions as F
from pyspark.sql.functions import concat, lit, expr, when, explode, split, collect_list, concat_ws

df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_column'])
df = df.withColumn('all_values',explode(split('error_column',',')))
df.show()

look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text_msg1'),('date_to', 'DD-MM-YY', 'test_msg2'),('emp_name', 'VARCHAR(100)', 'test_msg3'),('emp_type', 'VARCHAR(100)', 'test_msg4')], ['column_name', 'clmn1', 'comment'])
look_up_df = (
  look_up_df.withColumn('msg', 
                        when(look_up_df.column_name.like('%date%'),concat(look_up_df.column_name,lit(' expected as '),look_up_df.clmn1, lit('  and comment is '), look_up_df.comment))
                        .otherwise(concat(look_up_df.column_name,lit(' should be '),look_up_df.clmn1, lit('  and comment is '), look_up_df.comment)) )
)
display(look_up_df)


result = df.join(look_up_df,df.all_values == look_up_df.column_name , how = 'inner').groupby('col1','col2', 'error_column').agg(collect_list('msg').alias('final_msg')).withColumn('final_msg2',concat_ws(',','final_msg'))

display(result)

好吧,您要找的东西非常简单,只有多个步骤,如果您写得不正确,可能会有点混乱。您可能会发现这个答案基本上与另一个答案相似,但结构更好。我在每个步骤上都添加了评论,但请随意 运行 每个步骤都遵循逻辑。

from pyspark.sql import functions as F

(df
    .withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
    .join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
    .withColumn('error_desc', F.concat( # concat as your requirement
        F.col('error_column'),
        F.lit(' expected '),
        F.col('clmn1'),
        F.lit(' and comment is '),
        F.col('comment'),
    ))
    .groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
    .agg(
        F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
        F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
    )
    .show(10, False)
)

+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|col1|col2|error_column      |error_desc                                                                                               |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|1   |4   |date_from, date_to|date_from expected DD-MM-YY and comment is  text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1   |8   |emp_name          |emp_name expected VARCHAR(100) and comment is test msg3                                                  |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+