PySpark Filter between - 根据组提供上限和下限列表

PySpark Filter between - provide a list of upper and lower bounds, based on groups

我有一个 PySpark 数据框,想过滤上限和下限之间的行。 通常情况下,我只会使用带有 between:

的过滤器
import pandas as pd
from pyspark.sql import functions as F
... sql_context creation ...

pdfRaw=pd.DataFrame([{"vehicleID":'A', "Segment":'State Hwy', "speed":68.0},\
{"vehicleID":'B', "Segment":'State Hwy', "speed":76.0}])
dfRaw = sql_context.createDataFrame(pdfRaw).withColumn("vehicleID", "Segment", "speed")

dfRaw.show()

+-----------+------------+-----+
   vehicleID|     Segment|value|
+-----------+------------+-----+
|          A|   State Hwy| 68.0|
|          B|   State Hwy| 73.0|
+-----------+------------+-----+

dfRaw.filter(F.col("speed").between(70,75)).show()

+-----------+------------+-----+
   vehicleID|     Segment|value|
+-----------+------------+-----+
|          B|   State Hwy| 73.0|
+-----------+------------+-----+

但是我有多个速度值,我想在它们之间进行过滤。

Speeds_Curious = {
[25,30],
[55,60],
[60,65],
[70,75]
}

我真的想更进一步。过滤器之间的上限和下限取决于先前数据帧的分组结果。

df_RoadSegments.groupby('Segment')\
.agg(F.min('SpeedLimit').alias('minSpeed'),\
F.max('SpeedLimit').alias('maxSpeed'))\
.show()

+-----------+----------+----------+
     Segment|  minSpeed|  maxSpeed|
+-----------+----------+----------+
|      Urban|      25.0|      30.0|
|  State Hwy|      55.0|      60.0|
|I-State Hwy|      60.0|      65.0|
|I-State Hwy|      70.0|      75.0|
+-----------+----------+----------+

所以基本上我想在不同数据框上作为列可用的值之间过滤数据框。

类似于:

dfLimits = df_RoadSegments.groupby('Segment')\
.agg(F.min('SpeedLimit').alias('minSpeed'),\ F.max('SpeedLimit').alias('maxSpeed'))

dfRaw.groupby('Segment')\
.filter(F.col("speed")\
.between(dfLimits.where(dfLimits.Segment=="State Hwy"(??)).select('minSpeed')),\
dfLimits.where(dfLimits.Segment=="State Hwy"(??)).select('maxSpeed'))))\
.show()

有什么想法吗?

以下方法将为您提供属于特定路段的 minmax 速度之间的所有车辆。

您可以加​​入两个数据框:

df_joined = dfRaw.join(dfLimits, on="Segment", how="left")
+---------+---------+-----+--------+--------+
|  Segment|vehicleID|speed|minSpeed|maxSpeed|
+---------+---------+-----+--------+--------+
|State Hwy|        A| 68.0|      55|      60|
|State Hwy|        B| 76.0|      55|      60|
+---------+---------+-----+--------+--------+

如果你想进一步标记速度是否在上述范围内,那么你可以这样写:

flag_df = df_joined.withColumn("flag", F.when((F.col("speed") > F.col("minSpeed")) & (F.col("speed") < F.col("minSpeed")), 1).otherwise(0))
flag_df.show()
+---------+---------+-----+--------+--------+----+
|  Segment|vehicleID|speed|minSpeed|maxSpeed|flag|
+---------+---------+-----+--------+--------+----+
|State Hwy|        A| 68.0|      55|      60|   0|
|State Hwy|        B| 76.0|      55|      60|   0|
+---------+---------+-----+--------+--------+----+

然后您可以简单地过滤标志说:

df_final = df.filter(F.col("flag") == 1)