Pyspark 合并 2 个数据帧而不丢失数据
Pyspark merge 2 dataframes without losing data
我正在寻找加入 2 个 pyspark 数据帧而不丢失任何数据。最简单的方法是用示例向您展示。甚至可以将它们一起计数并排序。如果 desktop
或 phone
列中为空,则它在输出中应等于 0。
我试过:
desktop_df.join(phone_df, on='query')\
.fillna(0).orderBy("desktop", ascending=False)\
.show(20)
(还没有总栏目,所以按count1排序)
但这种方法似乎不起作用 - 根本不显示零。
desktop_df:
query |desktop|
----------------
query1 | 12 |
----------------
query2 | 23 |
----------------
query3 | 8 |
----------------
query4 | 11 |
----------------
query6 | 45 |
----------------
query9 | 89 |
phone_df:
query | phone |
----------------
query1 | 21 |
----------------
query2 | 33 |
----------------
query4 | 11 |
----------------
query5 | 55 |
----------------
query6 | 45 |
----------------
query7 | 1234 |
----------------
query8 | 4321 |
----------------
query10| 10 |
----------------
query11| 1 |
我正在寻找的输出:
query | desktop| phone | total |
--------------------------------
query8 | 0 | 4321 | 4321 |
--------------------------------
query7 | 0 | 1234 | 1234 |
--------------------------------
query6 | 45 | 45 | 90 |
--------------------------------
query9 | 89 | 0 | 89 |
--------------------------------
query2 | 23 | 33 | 56 |
--------------------------------
query5 | 0 | 55 | 55 |
--------------------------------
query1 | 12 | 21 | 33 |
--------------------------------
query4 | 11 | 11 | 22 |
--------------------------------
query10| 0 | 10 | 10 |
--------------------------------
query3 | 8 | 0 | 8 |
--------------------------------
query11| 0 | 1 | 1 |
想要的解决方案
df = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total",col("desktop")+col("phone")).show(200)
或
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import max
desktop_df = df.filter("hwType == 'DESKTOP'").groupby("query").count().orderBy("count", ascending=False).withColumnRenamed('count','desktop')
phone_df = df.filter("hwType == 'PHONE'").groupby("query").count().orderBy("count", ascending=False).withColumnRenamed('count','phone')
# add missing column to each dataframe
desktop_df = desktop_df.withColumn('phone', lit(0)).select('query', 'desktop', 'phone')
phone_df = phone_df.withColumn('desktop', lit(0)).select('query', 'desktop', 'phone')
# union all and agg to select max value
phone_df.unionAll(desktop_df).groupBy('query').agg(max(col('desktop')).alias('desktop'), max(col('phone')).alias('phone'))
# withColumn('total', col('desktop') + col('phone')) \
# .orderBy(col('total').desc()) \
# .show()
可以尝试在查询列上进行内部联接。并通过添加列值找到 "Total"。
df = desktop_df.join(phone_df, desktop_df.query==phone_df.query,"full").select(desktop_df.query,"count1","count2").fillna(0).withColumn("total",col("count1")+col("count2"))
您可以使用 unionAll
然后 groupBy
。
示例:
desktop_data = [("query1", 12), ("query2", 23), ("query3", 8),
("query4", 11), ("query6", 45), ("query9", 89)]
phone_data = [("query1", 21), ("query2", 33), ("query4", 11), ("query5", 55), ("query6", 45),
("query7", 1234), ("query8", 4321), ("query10", 10), ("query11", 1)]
desktop_df = spark.createDataFrame(desktop_data, ['query', 'count1'])
phone_df = spark.createDataFrame(phone_data, ['query', 'count2'])
# add missing column to each dataframe
desktop_df = desktop_df.withColumn('count2', lit(0)).select('query', 'count1', 'count2')
phone_df = phone_df.withColumn('count1', lit(0)).select('query', 'count1', 'count2')
# union all and agg to select max value
phone_df.unionAll(desktop_df) \
.groupBy('query').agg(max(col('count1')).alias('count1'), max(col('count2')).alias('count2')) \
.withColumn('total', col('count1') + col('count2')) \
.orderBy(col('total').desc()) \
.show()
+-------+------+------+-----+
| query|count1|count2|total|
+-------+------+------+-----+
| query8| 0| 4321| 4321|
| query7| 0| 1234| 1234|
| query6| 45| 45| 90|
| query9| 89| 0| 89|
| query2| 23| 33| 56|
| query5| 0| 55| 55|
| query1| 12| 21| 33|
| query4| 11| 11| 22|
|query10| 0| 10| 10|
| query3| 8| 0| 8|
|query11| 0| 1| 1|
+-------+------+------+-----+
我正在寻找加入 2 个 pyspark 数据帧而不丢失任何数据。最简单的方法是用示例向您展示。甚至可以将它们一起计数并排序。如果 desktop
或 phone
列中为空,则它在输出中应等于 0。
我试过:
desktop_df.join(phone_df, on='query')\
.fillna(0).orderBy("desktop", ascending=False)\
.show(20)
(还没有总栏目,所以按count1排序)
但这种方法似乎不起作用 - 根本不显示零。
desktop_df:
query |desktop|
----------------
query1 | 12 |
----------------
query2 | 23 |
----------------
query3 | 8 |
----------------
query4 | 11 |
----------------
query6 | 45 |
----------------
query9 | 89 |
phone_df:
query | phone |
----------------
query1 | 21 |
----------------
query2 | 33 |
----------------
query4 | 11 |
----------------
query5 | 55 |
----------------
query6 | 45 |
----------------
query7 | 1234 |
----------------
query8 | 4321 |
----------------
query10| 10 |
----------------
query11| 1 |
我正在寻找的输出:
query | desktop| phone | total |
--------------------------------
query8 | 0 | 4321 | 4321 |
--------------------------------
query7 | 0 | 1234 | 1234 |
--------------------------------
query6 | 45 | 45 | 90 |
--------------------------------
query9 | 89 | 0 | 89 |
--------------------------------
query2 | 23 | 33 | 56 |
--------------------------------
query5 | 0 | 55 | 55 |
--------------------------------
query1 | 12 | 21 | 33 |
--------------------------------
query4 | 11 | 11 | 22 |
--------------------------------
query10| 0 | 10 | 10 |
--------------------------------
query3 | 8 | 0 | 8 |
--------------------------------
query11| 0 | 1 | 1 |
想要的解决方案
df = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total",col("desktop")+col("phone")).show(200)
或
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import max
desktop_df = df.filter("hwType == 'DESKTOP'").groupby("query").count().orderBy("count", ascending=False).withColumnRenamed('count','desktop')
phone_df = df.filter("hwType == 'PHONE'").groupby("query").count().orderBy("count", ascending=False).withColumnRenamed('count','phone')
# add missing column to each dataframe
desktop_df = desktop_df.withColumn('phone', lit(0)).select('query', 'desktop', 'phone')
phone_df = phone_df.withColumn('desktop', lit(0)).select('query', 'desktop', 'phone')
# union all and agg to select max value
phone_df.unionAll(desktop_df).groupBy('query').agg(max(col('desktop')).alias('desktop'), max(col('phone')).alias('phone'))
# withColumn('total', col('desktop') + col('phone')) \
# .orderBy(col('total').desc()) \
# .show()
可以尝试在查询列上进行内部联接。并通过添加列值找到 "Total"。
df = desktop_df.join(phone_df, desktop_df.query==phone_df.query,"full").select(desktop_df.query,"count1","count2").fillna(0).withColumn("total",col("count1")+col("count2"))
您可以使用 unionAll
然后 groupBy
。
示例:
desktop_data = [("query1", 12), ("query2", 23), ("query3", 8),
("query4", 11), ("query6", 45), ("query9", 89)]
phone_data = [("query1", 21), ("query2", 33), ("query4", 11), ("query5", 55), ("query6", 45),
("query7", 1234), ("query8", 4321), ("query10", 10), ("query11", 1)]
desktop_df = spark.createDataFrame(desktop_data, ['query', 'count1'])
phone_df = spark.createDataFrame(phone_data, ['query', 'count2'])
# add missing column to each dataframe
desktop_df = desktop_df.withColumn('count2', lit(0)).select('query', 'count1', 'count2')
phone_df = phone_df.withColumn('count1', lit(0)).select('query', 'count1', 'count2')
# union all and agg to select max value
phone_df.unionAll(desktop_df) \
.groupBy('query').agg(max(col('count1')).alias('count1'), max(col('count2')).alias('count2')) \
.withColumn('total', col('count1') + col('count2')) \
.orderBy(col('total').desc()) \
.show()
+-------+------+------+-----+
| query|count1|count2|total|
+-------+------+------+-----+
| query8| 0| 4321| 4321|
| query7| 0| 1234| 1234|
| query6| 45| 45| 90|
| query9| 89| 0| 89|
| query2| 23| 33| 56|
| query5| 0| 55| 55|
| query1| 12| 21| 33|
| query4| 11| 11| 22|
|query10| 0| 10| 10|
| query3| 8| 0| 8|
|query11| 0| 1| 1|
+-------+------+------+-----+