根据最近的记录加入pyspark

Join pyspark based on the most recent record

我需要加入这些数据帧:

df0:
+-------------
|id |quantity|
+-------------
|  a|       4|
|  b|       7|
|  c|       6|
|  d|       1|
+-------------
df1:
+------------------------
|id |order_id|order_date|
+------------------------
|  a|       x|2021-01-25|
|  a|       y|2021-01-23|
|  b|       z|2021-01-28|
|  b|       x|2021-01-20|
|  c|       y|2021-01-15|
|  d|       x|2021-01-18|
+------------------------

我想要得到的结果如下:

+----------------------------------
|id |quantity |order_id|order_date|
+----------------------------------
|  a|       4 |       x|2021-01-25|
|  b|       7 |       z|2021-01-28|
|  c|       6 |       y|2021-01-15|
|  d|       1 |       x|2021-01-18|
+----------------------------------

也就是说,我只需要根据order_date.

的最新记录加入

简单地在 id 上分组 df1 并聚合最大值 order_date 然后将结果与 df0:

连接
import pyspark.sql.functions as F

result = df0.join(
    df1.groupBy("id").agg(F.max("order_date").alias("order_date")),
    on=["id"]
)

result.show()
#+---+--------+----------+
#| id|quantity|order_date|
#+---+--------+----------+
#|  d|       1|2021-01-18|
#|  c|       6|2021-01-15|
#|  b|       7|2021-01-28|
#|  a|       4|2021-01-25|
#+---+--------+----------+
pyspark

#Importing Libs
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*
findspark.init()

#creating Spark Session
spark = SparkSession.builder.appName("Test").getOrCreate()

#Data
data1=[{'id':'a', 'quantity':4},
{'id':'b', 'quantity':7},
{'id':'c', 'quantity':6},
{'id':'d', 'quantity':1}]

data2=[{'id':'a', 'order_id':'x', 'order_date':'2021-01-25'},
{'id':'a', 'order_id':'y', 'order_date':'2021-01-23'},
{'id':'b', 'order_id':'z', 'order_date':'2021-01-28'},
{'id':'b', 'order_id':'x', 'order_date':'2021-01-20'},
{'id':'c', 'order_id':'y', 'order_date':'2021-01-15'},
{'id':'d', 'order_id':'x', 'order_date':'2021-01-18'}
]

#creating dataframes
df0=spark.createDataFrame(data1)
df1=spark.createDataFrame(data2)
df0.show()
+---+--------+
| id|quantity|
+---+--------+
|  a|       4|
|  b|       7|
|  c|       6|
|  d|       1|
+---+--------+
df1.show()
+---+----------+--------+
| id|order_date|order_id|
+---+----------+--------+
|  a|2021-01-25|       x|
|  a|2021-01-23|       y|
|  b|2021-01-28|       z|
|  b|2021-01-20|       x|
|  c|2021-01-15|       y|
|  d|2021-01-18|       x|
+---+----------+--------+

#Arranging the order_date column by using Aggregate functions

dff1=df1.groupBy("id").agg(F.max("order_date").alias("order_date"))
dff1.show()
+---+----------+
| id|order_date|
+---+----------+
|  d|2021-01-18|
|  c|2021-01-15|
|  b|2021-01-28|
|  a|2021-01-25|
+---+----------+

#applying join and printing result
result=df0.join(dff1,df0['id']==df1['id'], 'inner')
result.show()

+---+--------+---+----------+
| id|quantity| id|order_date|
+---+--------+---+----------+
|  d|       1|  d|2021-01-18|
|  c|       6|  c|2021-01-15|
|  b|       7|  b|2021-01-28|
|  a|       4|  a|2021-01-25|
+---+--------+---+----------+