在 PySpark 中查找从中提取最小值超过 window.partitionBy 的行值
Find the row value from which minimum value was extracted over window.partitionBy in PySpark
我有一个像这样的 PySpark 数据框:
+--------+-------------+--------------+-----------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|
+--------+-------------+--------------+-----------------------+
| Copper| 2019-01-09| 2.6945| 2.6838|
| Copper| 2019-01-23| 2.6838| 2.6838|
| Zinc| 2019-01-23| 1.1829| 1.1829|
| Zinc| 2019-06-26| 1.1918| 1.1918|
|Aluminum| 2019-01-02| 0.8363| 0.8342|
|Aluminum| 2019-01-09| 0.8342| 0.8342|
|Aluminum| 2019-01-23| 0.8555| 0.8342|
|Aluminum| 2019-04-03| 0.8461| 0.8461|
+--------+-------------+--------------+-----------------------+
最后一列 'min_mkt_prc_over_1month' 计算为 material 一个月内的最小值 'mkt_prc_usd_lb'(第 3 列),即(-15 天,至 +15 天)material, purchase_date window:
密码是:
w2 = (Window()
.partitionBy("material")
.orderBy(col("purchase_date").cast("timestamp").cast("long"))
.rangeBetween(-days(15), days(15)))
现在,我想看看当金额 was/will 最小时 'purchase_date' 是多少?
预期输出:(从前两行开始)
+--------+-------------+--------------+-----------------------+------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price |
+--------+-------------+--------------+-----------------------+------------------+
| Copper| 2019-01-09| 2.6945| 2.6838| 2019-01-23|
| Copper| 2019-01-23| 2.6838| 2.6838| 2019-01-23|
+--------+-------------+--------------+-----------------------+------------------+
试试这个。我们可以在 two prc are the same to populate it with purchase date
、otherwise to put Null
的位置创建一个列,然后我们可以使用 First with ignoreNulls=True
,在我们的 newly created column using our window w2.
.
from pyspark.sql.functions import *
from pyspark.sql.window import Window
days= lambda i: i * 86400
w2 = (Window()
.partitionBy("material")
.orderBy(col("purchase_date").cast("timestamp").cast("long"))
.rangeBetween(-days(15), days(15)))
df.withColumn("first",\
expr("""IF(mkt_prc_usd_lb=min_mkt_prc_over_1month,purchase_date,null)"""))\
.withColumn("date_of_min_price", first("first", True).over(w2)).drop("first")\
.show()
#+--------+-------------+--------------+-----------------------+-----------------+
#|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price|
#+--------+-------------+--------------+-----------------------+-----------------+
#| Copper| 2019-01-09| 2.6945| 2.6838| 2019-01-23|
#| Copper| 2019-01-23| 2.6838| 2.6838| 2019-01-23|
#| Zinc| 2019-01-23| 1.1829| 1.1829| 2019-01-23|
#| Zinc| 2019-06-26| 1.1918| 1.1918| 2019-06-26|
#|Aluminum| 2019-01-02| 0.8363| 0.8342| 2019-01-09|
#|Aluminum| 2019-01-09| 0.8342| 0.8342| 2019-01-09|
#|Aluminum| 2019-01-23| 0.8555| 0.8342| 2019-01-09|
#|Aluminum| 2019-04-03| 0.8461| 0.8461| 2019-04-03|
#+--------+-------------+--------------+-----------------------+-----------------+
我有一个像这样的 PySpark 数据框:
+--------+-------------+--------------+-----------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|
+--------+-------------+--------------+-----------------------+
| Copper| 2019-01-09| 2.6945| 2.6838|
| Copper| 2019-01-23| 2.6838| 2.6838|
| Zinc| 2019-01-23| 1.1829| 1.1829|
| Zinc| 2019-06-26| 1.1918| 1.1918|
|Aluminum| 2019-01-02| 0.8363| 0.8342|
|Aluminum| 2019-01-09| 0.8342| 0.8342|
|Aluminum| 2019-01-23| 0.8555| 0.8342|
|Aluminum| 2019-04-03| 0.8461| 0.8461|
+--------+-------------+--------------+-----------------------+
最后一列 'min_mkt_prc_over_1month' 计算为 material 一个月内的最小值 'mkt_prc_usd_lb'(第 3 列),即(-15 天,至 +15 天)material, purchase_date window:
密码是:
w2 = (Window()
.partitionBy("material")
.orderBy(col("purchase_date").cast("timestamp").cast("long"))
.rangeBetween(-days(15), days(15)))
现在,我想看看当金额 was/will 最小时 'purchase_date' 是多少?
预期输出:(从前两行开始)
+--------+-------------+--------------+-----------------------+------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price |
+--------+-------------+--------------+-----------------------+------------------+
| Copper| 2019-01-09| 2.6945| 2.6838| 2019-01-23|
| Copper| 2019-01-23| 2.6838| 2.6838| 2019-01-23|
+--------+-------------+--------------+-----------------------+------------------+
试试这个。我们可以在 two prc are the same to populate it with purchase date
、otherwise to put Null
的位置创建一个列,然后我们可以使用 First with ignoreNulls=True
,在我们的 newly created column using our window w2.
.
from pyspark.sql.functions import *
from pyspark.sql.window import Window
days= lambda i: i * 86400
w2 = (Window()
.partitionBy("material")
.orderBy(col("purchase_date").cast("timestamp").cast("long"))
.rangeBetween(-days(15), days(15)))
df.withColumn("first",\
expr("""IF(mkt_prc_usd_lb=min_mkt_prc_over_1month,purchase_date,null)"""))\
.withColumn("date_of_min_price", first("first", True).over(w2)).drop("first")\
.show()
#+--------+-------------+--------------+-----------------------+-----------------+
#|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price|
#+--------+-------------+--------------+-----------------------+-----------------+
#| Copper| 2019-01-09| 2.6945| 2.6838| 2019-01-23|
#| Copper| 2019-01-23| 2.6838| 2.6838| 2019-01-23|
#| Zinc| 2019-01-23| 1.1829| 1.1829| 2019-01-23|
#| Zinc| 2019-06-26| 1.1918| 1.1918| 2019-06-26|
#|Aluminum| 2019-01-02| 0.8363| 0.8342| 2019-01-09|
#|Aluminum| 2019-01-09| 0.8342| 0.8342| 2019-01-09|
#|Aluminum| 2019-01-23| 0.8555| 0.8342| 2019-01-09|
#|Aluminum| 2019-04-03| 0.8461| 0.8461| 2019-04-03|
#+--------+-------------+--------------+-----------------------+-----------------+