如何在pyspark中连接数据框

How to make join between dataframes of pspark

我有两个DataFrame,分别是DF1和DF2,每个DataFrame的内容如下:

df1:

line_item_usage_account_id  line_item_unblended_cost    name 
100000000001                12.05                       account1
200000000001                52                          account2
300000000003                12.03                       account3

df2:

accountname     accountproviderid   clustername     app_pmo     app_costcenter
account1        100000000001        cluster1        111111      11111111
account2        200000000001        cluster2        222222      22222222

我需要对字段 df1.line_item_usage_account_id 和 df2.accountproviderid

进行联接

当两个字段具有相同的ID时,必须添加DF1 line_item_unblended_cost列的值。 而当DF1的line_item_usage_account_id字段的值不在DF2的accountproviderid列中时,df1字段必须聚合如下:

accountname     accountproviderid   clustername     app_pmo     app_costcenter      line_item_unblended_cost
account1        100000000001        cluster1        111111      11111111            12.05
account2        200000000001        cluster2        222222      22222222            52
account3        300000000003        NA              NA          NA                  12.03

通过用“na”填充 DF2 的列,将 account3 数据添加到新 DataFrame 的末尾。

提前感谢任何帮助。

from pyspark.sql import SparkSession   
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([
    [100000000001, 12.05, 'account1'], 
    [200000000001, 52.00, 'account2'], 
    [300000000003, 12.03, 'account3']], 
    schema=['line_item_usage_account_id',  'line_item_unblended_cost', 'name' ])

df1.show()
df1.printSchema()

df2 = spark.createDataFrame([
    ['account1', 100000000001, 'cluster1', 111111, 11111111],
    ['account2', 200000000001, 'cluster2', 222222, 22222222]], 
    schema=['accountname', 'accountproviderid', 'clustername', 'app_pmo', 'app_costcenter'])

df2.printSchema()
df2.show()

cols = ['name', 'line_item_usage_account_id', 'clustername', 'app_pmo', 'app_costcenter', 'line_item_unblended_cost']
resDF = df1.join(df2, df1.line_item_usage_account_id == df2.accountproviderid, "leftouter").select(*cols).withColumnRenamed('name', 'accountname').withColumnRenamed('line_item_usage_account_id', 'accountproviderid').orderBy('accountname')

resDF.printSchema()
 # |-- accountname: string (nullable = true)
 # |-- accountproviderid: long (nullable = true)
 # |-- clustername: string (nullable = true)
 # |-- app_pmo: long (nullable = true)
 # |-- app_costcenter: long (nullable = true)
#  |-- line_item_unblended_cost: double (nullable = true)

resDF.show()
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# |accountname|accountproviderid|clustername|app_pmo|app_costcenter|line_item_unblended_cost|
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# |   account1|     100000000001|   cluster1| 111111|      11111111|                   12.05|
# |   account2|     200000000001|   cluster2| 222222|      22222222|                    52.0|
# |   account3|     300000000003|       null|   null|          null|                   12.03|
# +-----------+-----------------+-----------+-------+--------------+------------------------+