如何在pyspark中连接数据框
How to make join between dataframes of pspark
我有两个DataFrame,分别是DF1和DF2,每个DataFrame的内容如下:
df1:
line_item_usage_account_id line_item_unblended_cost name
100000000001 12.05 account1
200000000001 52 account2
300000000003 12.03 account3
df2:
accountname accountproviderid clustername app_pmo app_costcenter
account1 100000000001 cluster1 111111 11111111
account2 200000000001 cluster2 222222 22222222
我需要对字段 df1.line_item_usage_account_id 和 df2.accountproviderid
进行联接
当两个字段具有相同的ID时,必须添加DF1 line_item_unblended_cost列的值。
而当DF1的line_item_usage_account_id字段的值不在DF2的accountproviderid列中时,df1字段必须聚合如下:
accountname accountproviderid clustername app_pmo app_costcenter line_item_unblended_cost
account1 100000000001 cluster1 111111 11111111 12.05
account2 200000000001 cluster2 222222 22222222 52
account3 300000000003 NA NA NA 12.03
通过用“na”填充 DF2 的列,将 account3 数据添加到新 DataFrame 的末尾。
提前感谢任何帮助。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([
[100000000001, 12.05, 'account1'],
[200000000001, 52.00, 'account2'],
[300000000003, 12.03, 'account3']],
schema=['line_item_usage_account_id', 'line_item_unblended_cost', 'name' ])
df1.show()
df1.printSchema()
df2 = spark.createDataFrame([
['account1', 100000000001, 'cluster1', 111111, 11111111],
['account2', 200000000001, 'cluster2', 222222, 22222222]],
schema=['accountname', 'accountproviderid', 'clustername', 'app_pmo', 'app_costcenter'])
df2.printSchema()
df2.show()
cols = ['name', 'line_item_usage_account_id', 'clustername', 'app_pmo', 'app_costcenter', 'line_item_unblended_cost']
resDF = df1.join(df2, df1.line_item_usage_account_id == df2.accountproviderid, "leftouter").select(*cols).withColumnRenamed('name', 'accountname').withColumnRenamed('line_item_usage_account_id', 'accountproviderid').orderBy('accountname')
resDF.printSchema()
# |-- accountname: string (nullable = true)
# |-- accountproviderid: long (nullable = true)
# |-- clustername: string (nullable = true)
# |-- app_pmo: long (nullable = true)
# |-- app_costcenter: long (nullable = true)
# |-- line_item_unblended_cost: double (nullable = true)
resDF.show()
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# |accountname|accountproviderid|clustername|app_pmo|app_costcenter|line_item_unblended_cost|
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# | account1| 100000000001| cluster1| 111111| 11111111| 12.05|
# | account2| 200000000001| cluster2| 222222| 22222222| 52.0|
# | account3| 300000000003| null| null| null| 12.03|
# +-----------+-----------------+-----------+-------+--------------+------------------------+
我有两个DataFrame,分别是DF1和DF2,每个DataFrame的内容如下:
df1:
line_item_usage_account_id line_item_unblended_cost name
100000000001 12.05 account1
200000000001 52 account2
300000000003 12.03 account3
df2:
accountname accountproviderid clustername app_pmo app_costcenter
account1 100000000001 cluster1 111111 11111111
account2 200000000001 cluster2 222222 22222222
我需要对字段 df1.line_item_usage_account_id 和 df2.accountproviderid
进行联接当两个字段具有相同的ID时,必须添加DF1 line_item_unblended_cost列的值。 而当DF1的line_item_usage_account_id字段的值不在DF2的accountproviderid列中时,df1字段必须聚合如下:
accountname accountproviderid clustername app_pmo app_costcenter line_item_unblended_cost
account1 100000000001 cluster1 111111 11111111 12.05
account2 200000000001 cluster2 222222 22222222 52
account3 300000000003 NA NA NA 12.03
通过用“na”填充 DF2 的列,将 account3 数据添加到新 DataFrame 的末尾。
提前感谢任何帮助。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([
[100000000001, 12.05, 'account1'],
[200000000001, 52.00, 'account2'],
[300000000003, 12.03, 'account3']],
schema=['line_item_usage_account_id', 'line_item_unblended_cost', 'name' ])
df1.show()
df1.printSchema()
df2 = spark.createDataFrame([
['account1', 100000000001, 'cluster1', 111111, 11111111],
['account2', 200000000001, 'cluster2', 222222, 22222222]],
schema=['accountname', 'accountproviderid', 'clustername', 'app_pmo', 'app_costcenter'])
df2.printSchema()
df2.show()
cols = ['name', 'line_item_usage_account_id', 'clustername', 'app_pmo', 'app_costcenter', 'line_item_unblended_cost']
resDF = df1.join(df2, df1.line_item_usage_account_id == df2.accountproviderid, "leftouter").select(*cols).withColumnRenamed('name', 'accountname').withColumnRenamed('line_item_usage_account_id', 'accountproviderid').orderBy('accountname')
resDF.printSchema()
# |-- accountname: string (nullable = true)
# |-- accountproviderid: long (nullable = true)
# |-- clustername: string (nullable = true)
# |-- app_pmo: long (nullable = true)
# |-- app_costcenter: long (nullable = true)
# |-- line_item_unblended_cost: double (nullable = true)
resDF.show()
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# |accountname|accountproviderid|clustername|app_pmo|app_costcenter|line_item_unblended_cost|
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# | account1| 100000000001| cluster1| 111111| 11111111| 12.05|
# | account2| 200000000001| cluster2| 222222| 22222222| 52.0|
# | account3| 300000000003| null| null| null| 12.03|
# +-----------+-----------------+-----------+-------+--------------+------------------------+