Pyspark 合并 2 个数据帧而不丢失数据

Pyspark merge 2 dataframes without losing data

我正在寻找加入 2 个 pyspark 数据帧而不丢失任何数据。最简单的方法是用示例向您展示。甚至可以将它们一起计数并排序。如果 desktopphone 列中为空,则它在输出中应等于 0。


desktop_df.join(phone_df, on='query')\
          .fillna(0).orderBy("desktop", ascending=False)\


但这种方法似乎不起作用 - 根本不显示零。


query  |desktop|
query1 |  12   | 
query2 |  23   |
query3 |  8    |
query4 |  11   |
query6 |  45   |
query9 |  89   |


query  | phone |
query1 |  21   | 
query2 |  33   |
query4 |  11   |
query5 |  55   |
query6 |  45   |
query7 | 1234  |
query8 | 4321  |
query10|  10   |
query11|  1    |


query  | desktop| phone  | total |
query8 |   0    |  4321  | 4321  |
query7 |   0    |  1234  | 1234  |
query6 |   45   |   45   |  90   |
query9 |   89   |   0    |  89   |
query2 |   23   |   33   |  56   |
query5 |   0    |   55   |  55   |
query1 |   12   |   21   |  33   |
query4 |   11   |   11   |  22   |
query10|   0    |   10   |  10   |
query3 |   8    |   0    |  8    |
query11|   0    |   1    |  1    |


df = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total",col("desktop")+col("phone")).show(200)

from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import max

desktop_df = df.filter("hwType == 'DESKTOP'").groupby("query").count().orderBy("count", ascending=False).withColumnRenamed('count','desktop')
phone_df = df.filter("hwType == 'PHONE'").groupby("query").count().orderBy("count", ascending=False).withColumnRenamed('count','phone')

# add missing column to each dataframe
desktop_df = desktop_df.withColumn('phone', lit(0)).select('query', 'desktop', 'phone')
phone_df = phone_df.withColumn('desktop', lit(0)).select('query', 'desktop', 'phone')

# union all and agg to select max value
phone_df.unionAll(desktop_df).groupBy('query').agg(max(col('desktop')).alias('desktop'), max(col('phone')).alias('phone'))
# withColumn('total', col('desktop') + col('phone')) \
#    .orderBy(col('total').desc()) \
#    .show()

可以尝试在查询列上进行内部联接。并通过添加列值找到 "Total"。

df = desktop_df.join(phone_df, desktop_df.query==phone_df.query,"full").select(desktop_df.query,"count1","count2").fillna(0).withColumn("total",col("count1")+col("count2"))

您可以使用 unionAll 然后 groupBy


desktop_data = [("query1", 12), ("query2", 23), ("query3", 8),
                ("query4", 11), ("query6", 45), ("query9", 89)]

phone_data = [("query1", 21), ("query2", 33), ("query4", 11), ("query5", 55), ("query6", 45),
             ("query7", 1234), ("query8", 4321), ("query10", 10), ("query11", 1)]

desktop_df = spark.createDataFrame(desktop_data, ['query', 'count1'])
phone_df = spark.createDataFrame(phone_data, ['query', 'count2'])

# add missing column to each dataframe
desktop_df = desktop_df.withColumn('count2', lit(0)).select('query', 'count1', 'count2')
phone_df = phone_df.withColumn('count1', lit(0)).select('query', 'count1', 'count2')

# union all and agg to select max value
phone_df.unionAll(desktop_df) \
   .groupBy('query').agg(max(col('count1')).alias('count1'), max(col('count2')).alias('count2')) \
   .withColumn('total', col('count1') + col('count2')) \
   .orderBy(col('total').desc()) \

|  query|count1|count2|total|
| query8|     0|  4321| 4321|
| query7|     0|  1234| 1234|
| query6|    45|    45|   90|
| query9|    89|     0|   89|
| query2|    23|    33|   56|
| query5|     0|    55|   55|
| query1|    12|    21|   33|
| query4|    11|    11|   22|
|query10|     0|    10|   10|
| query3|     8|     0|    8|
|query11|     0|     1|    1|