需要应用多个动态条件时如何过滤pyspark数据框

How filter pyspark dataframe when multiple dynamic criteria needs to be applied

我想根据 DfCriteria 数据帧中的条件过滤数据帧 df

df = spark.createDataFrame([
Row(ID ="A01", Date=date(2021, 1, 1), SalesUSD=324),
Row(ID ="A01", Date=date(2021, 1, 2), SalesUSD=567),
Row(ID ="A01", Date=date(2021, 1, 3), SalesUSD=645),
Row(ID ="A01", Date=date(2021, 1, 4), SalesUSD=834),
Row(ID ="A02", Date=date(2021, 1, 1), SalesUSD=284),
Row(ID ="A02", Date=date(2021, 1, 2), SalesUSD=453),
Row(ID ="A02", Date=date(2021, 1, 3), SalesUSD=132),
Row(ID ="A04", Date=date(2021, 1, 4), SalesUSD=254)
]) 
|ID |  Date      |  SalesUSD |
|---|------------|-----------|
|A01|  2021-01-01|     324   |
|A01|  2021-01-02|     567   |
|A01|  2021-01-03|     645   |
|A01|  2021-01-04|     834   |
|A02|  2021-01-01|     284   |
|A02|  2021-01-02|     453   |
|A02|  2021-01-03|     132   |
|A04|  2021-01-04|     254   |

DF 标准:

DfCriteria = spark.createDataFrame([
Row(ID ="A01", StartDate=date(2021, 1, 1), EndDate=date(2021, 1, 2)),
Row(ID ="A02", StartDate=date(2021, 1, 2), EndDate=date(2021, 1, 4))])
|ID |StartDate  | EndDate    |
|---|-----------|------------|
|A01| 2021-01-01| 2021-01-02 |
|A02| 2021-01-02| 2021-01-04 |

预期输出

|ID |  Date      |  SalesUSD |
|---|------------|-----------|
|A01|  2021-01-01|     324   |
|A01|  2021-01-02|     567   |
|A02|  2021-01-02|     453   |
|A02|  2021-01-03|     132   |
|A04|  2021-01-04|     254   |

假设您只想保留开始日期和结束日期之间的事件:

df.join(DfCriteria, on='ID', how='left').where(F.col('Date').between('StartDate', 'EndDate'))

您可以使用left_semi加入:

from pyspark.sql import functions as F

df = spark.createDataFrame([
    ("A01", "2021-01-01", 324), ("A01", "2021-01-02", 567), ("A01", "2021-01-03", 645),
    ("A01", "2021-01-04", 834), ("A02", "2021-01-01", 284), ("A02", "2021-01-02", 453),
    ("A02", "2021-01-03", 132), ("A04", "2021-01-04", 254)], ["ID", "Date", "SalesUSD"])

DfCriteria = spark.createDataFrame([
    ("A01", "2021-01-01", "2021-01-02"), ("A02", "2021-01-02", "2021-01-04")
], ["ID", "StartDate", "EndDate"])

result = df.join(
    DfCriteria,
    (df["ID"] == DfCriteria["ID"]) & F.col("Date").between(F.col("StartDate"), F.col("EndDate")),
    'left_semi'
)

result.show()

# +---+----------+--------+
# | ID|      Date|SalesUSD|
# +---+----------+--------+
# |A01|2021-01-01|     324|
# |A01|2021-01-02|     567|
# |A02|2021-01-02|     453|
# |A02|2021-01-03|     132|
# +---+----------+--------+