根据最近的记录加入pyspark
Join pyspark based on the most recent record
我需要加入这些数据帧:
df0:
+-------------
|id |quantity|
+-------------
| a| 4|
| b| 7|
| c| 6|
| d| 1|
+-------------
df1:
+------------------------
|id |order_id|order_date|
+------------------------
| a| x|2021-01-25|
| a| y|2021-01-23|
| b| z|2021-01-28|
| b| x|2021-01-20|
| c| y|2021-01-15|
| d| x|2021-01-18|
+------------------------
我想要得到的结果如下:
+----------------------------------
|id |quantity |order_id|order_date|
+----------------------------------
| a| 4 | x|2021-01-25|
| b| 7 | z|2021-01-28|
| c| 6 | y|2021-01-15|
| d| 1 | x|2021-01-18|
+----------------------------------
也就是说,我只需要根据order_date
.
的最新记录加入
简单地在 id
上分组 df1
并聚合最大值 order_date
然后将结果与 df0
:
连接
import pyspark.sql.functions as F
result = df0.join(
df1.groupBy("id").agg(F.max("order_date").alias("order_date")),
on=["id"]
)
result.show()
#+---+--------+----------+
#| id|quantity|order_date|
#+---+--------+----------+
#| d| 1|2021-01-18|
#| c| 6|2021-01-15|
#| b| 7|2021-01-28|
#| a| 4|2021-01-25|
#+---+--------+----------+
pyspark
#Importing Libs
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*
findspark.init()
#creating Spark Session
spark = SparkSession.builder.appName("Test").getOrCreate()
#Data
data1=[{'id':'a', 'quantity':4},
{'id':'b', 'quantity':7},
{'id':'c', 'quantity':6},
{'id':'d', 'quantity':1}]
data2=[{'id':'a', 'order_id':'x', 'order_date':'2021-01-25'},
{'id':'a', 'order_id':'y', 'order_date':'2021-01-23'},
{'id':'b', 'order_id':'z', 'order_date':'2021-01-28'},
{'id':'b', 'order_id':'x', 'order_date':'2021-01-20'},
{'id':'c', 'order_id':'y', 'order_date':'2021-01-15'},
{'id':'d', 'order_id':'x', 'order_date':'2021-01-18'}
]
#creating dataframes
df0=spark.createDataFrame(data1)
df1=spark.createDataFrame(data2)
df0.show()
+---+--------+
| id|quantity|
+---+--------+
| a| 4|
| b| 7|
| c| 6|
| d| 1|
+---+--------+
df1.show()
+---+----------+--------+
| id|order_date|order_id|
+---+----------+--------+
| a|2021-01-25| x|
| a|2021-01-23| y|
| b|2021-01-28| z|
| b|2021-01-20| x|
| c|2021-01-15| y|
| d|2021-01-18| x|
+---+----------+--------+
#Arranging the order_date column by using Aggregate functions
dff1=df1.groupBy("id").agg(F.max("order_date").alias("order_date"))
dff1.show()
+---+----------+
| id|order_date|
+---+----------+
| d|2021-01-18|
| c|2021-01-15|
| b|2021-01-28|
| a|2021-01-25|
+---+----------+
#applying join and printing result
result=df0.join(dff1,df0['id']==df1['id'], 'inner')
result.show()
+---+--------+---+----------+
| id|quantity| id|order_date|
+---+--------+---+----------+
| d| 1| d|2021-01-18|
| c| 6| c|2021-01-15|
| b| 7| b|2021-01-28|
| a| 4| a|2021-01-25|
+---+--------+---+----------+
我需要加入这些数据帧:
df0:
+-------------
|id |quantity|
+-------------
| a| 4|
| b| 7|
| c| 6|
| d| 1|
+-------------
df1:
+------------------------
|id |order_id|order_date|
+------------------------
| a| x|2021-01-25|
| a| y|2021-01-23|
| b| z|2021-01-28|
| b| x|2021-01-20|
| c| y|2021-01-15|
| d| x|2021-01-18|
+------------------------
我想要得到的结果如下:
+----------------------------------
|id |quantity |order_id|order_date|
+----------------------------------
| a| 4 | x|2021-01-25|
| b| 7 | z|2021-01-28|
| c| 6 | y|2021-01-15|
| d| 1 | x|2021-01-18|
+----------------------------------
也就是说,我只需要根据order_date
.
简单地在 id
上分组 df1
并聚合最大值 order_date
然后将结果与 df0
:
import pyspark.sql.functions as F
result = df0.join(
df1.groupBy("id").agg(F.max("order_date").alias("order_date")),
on=["id"]
)
result.show()
#+---+--------+----------+
#| id|quantity|order_date|
#+---+--------+----------+
#| d| 1|2021-01-18|
#| c| 6|2021-01-15|
#| b| 7|2021-01-28|
#| a| 4|2021-01-25|
#+---+--------+----------+
pyspark
#Importing Libs
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*
findspark.init()
#creating Spark Session
spark = SparkSession.builder.appName("Test").getOrCreate()
#Data
data1=[{'id':'a', 'quantity':4},
{'id':'b', 'quantity':7},
{'id':'c', 'quantity':6},
{'id':'d', 'quantity':1}]
data2=[{'id':'a', 'order_id':'x', 'order_date':'2021-01-25'},
{'id':'a', 'order_id':'y', 'order_date':'2021-01-23'},
{'id':'b', 'order_id':'z', 'order_date':'2021-01-28'},
{'id':'b', 'order_id':'x', 'order_date':'2021-01-20'},
{'id':'c', 'order_id':'y', 'order_date':'2021-01-15'},
{'id':'d', 'order_id':'x', 'order_date':'2021-01-18'}
]
#creating dataframes
df0=spark.createDataFrame(data1)
df1=spark.createDataFrame(data2)
df0.show()
+---+--------+
| id|quantity|
+---+--------+
| a| 4|
| b| 7|
| c| 6|
| d| 1|
+---+--------+
df1.show()
+---+----------+--------+
| id|order_date|order_id|
+---+----------+--------+
| a|2021-01-25| x|
| a|2021-01-23| y|
| b|2021-01-28| z|
| b|2021-01-20| x|
| c|2021-01-15| y|
| d|2021-01-18| x|
+---+----------+--------+
#Arranging the order_date column by using Aggregate functions
dff1=df1.groupBy("id").agg(F.max("order_date").alias("order_date"))
dff1.show()
+---+----------+
| id|order_date|
+---+----------+
| d|2021-01-18|
| c|2021-01-15|
| b|2021-01-28|
| a|2021-01-25|
+---+----------+
#applying join and printing result
result=df0.join(dff1,df0['id']==df1['id'], 'inner')
result.show()
+---+--------+---+----------+
| id|quantity| id|order_date|
+---+--------+---+----------+
| d| 1| d|2021-01-18|
| c| 6| c|2021-01-15|
| b| 7| b|2021-01-28|
| a| 4| a|2021-01-25|
+---+--------+---+----------+