需要应用多个动态条件时如何过滤pyspark数据框
How filter pyspark dataframe when multiple dynamic criteria needs to be applied
我想根据 DfCriteria 数据帧中的条件过滤数据帧 df
df = spark.createDataFrame([
Row(ID ="A01", Date=date(2021, 1, 1), SalesUSD=324),
Row(ID ="A01", Date=date(2021, 1, 2), SalesUSD=567),
Row(ID ="A01", Date=date(2021, 1, 3), SalesUSD=645),
Row(ID ="A01", Date=date(2021, 1, 4), SalesUSD=834),
Row(ID ="A02", Date=date(2021, 1, 1), SalesUSD=284),
Row(ID ="A02", Date=date(2021, 1, 2), SalesUSD=453),
Row(ID ="A02", Date=date(2021, 1, 3), SalesUSD=132),
Row(ID ="A04", Date=date(2021, 1, 4), SalesUSD=254)
])
|ID | Date | SalesUSD |
|---|------------|-----------|
|A01| 2021-01-01| 324 |
|A01| 2021-01-02| 567 |
|A01| 2021-01-03| 645 |
|A01| 2021-01-04| 834 |
|A02| 2021-01-01| 284 |
|A02| 2021-01-02| 453 |
|A02| 2021-01-03| 132 |
|A04| 2021-01-04| 254 |
DF 标准:
DfCriteria = spark.createDataFrame([
Row(ID ="A01", StartDate=date(2021, 1, 1), EndDate=date(2021, 1, 2)),
Row(ID ="A02", StartDate=date(2021, 1, 2), EndDate=date(2021, 1, 4))])
|ID |StartDate | EndDate |
|---|-----------|------------|
|A01| 2021-01-01| 2021-01-02 |
|A02| 2021-01-02| 2021-01-04 |
预期输出
|ID | Date | SalesUSD |
|---|------------|-----------|
|A01| 2021-01-01| 324 |
|A01| 2021-01-02| 567 |
|A02| 2021-01-02| 453 |
|A02| 2021-01-03| 132 |
|A04| 2021-01-04| 254 |
假设您只想保留开始日期和结束日期之间的事件:
df.join(DfCriteria, on='ID', how='left').where(F.col('Date').between('StartDate', 'EndDate'))
您可以使用left_semi
加入:
from pyspark.sql import functions as F
df = spark.createDataFrame([
("A01", "2021-01-01", 324), ("A01", "2021-01-02", 567), ("A01", "2021-01-03", 645),
("A01", "2021-01-04", 834), ("A02", "2021-01-01", 284), ("A02", "2021-01-02", 453),
("A02", "2021-01-03", 132), ("A04", "2021-01-04", 254)], ["ID", "Date", "SalesUSD"])
DfCriteria = spark.createDataFrame([
("A01", "2021-01-01", "2021-01-02"), ("A02", "2021-01-02", "2021-01-04")
], ["ID", "StartDate", "EndDate"])
result = df.join(
DfCriteria,
(df["ID"] == DfCriteria["ID"]) & F.col("Date").between(F.col("StartDate"), F.col("EndDate")),
'left_semi'
)
result.show()
# +---+----------+--------+
# | ID| Date|SalesUSD|
# +---+----------+--------+
# |A01|2021-01-01| 324|
# |A01|2021-01-02| 567|
# |A02|2021-01-02| 453|
# |A02|2021-01-03| 132|
# +---+----------+--------+
我想根据 DfCriteria 数据帧中的条件过滤数据帧 df
df = spark.createDataFrame([
Row(ID ="A01", Date=date(2021, 1, 1), SalesUSD=324),
Row(ID ="A01", Date=date(2021, 1, 2), SalesUSD=567),
Row(ID ="A01", Date=date(2021, 1, 3), SalesUSD=645),
Row(ID ="A01", Date=date(2021, 1, 4), SalesUSD=834),
Row(ID ="A02", Date=date(2021, 1, 1), SalesUSD=284),
Row(ID ="A02", Date=date(2021, 1, 2), SalesUSD=453),
Row(ID ="A02", Date=date(2021, 1, 3), SalesUSD=132),
Row(ID ="A04", Date=date(2021, 1, 4), SalesUSD=254)
])
|ID | Date | SalesUSD |
|---|------------|-----------|
|A01| 2021-01-01| 324 |
|A01| 2021-01-02| 567 |
|A01| 2021-01-03| 645 |
|A01| 2021-01-04| 834 |
|A02| 2021-01-01| 284 |
|A02| 2021-01-02| 453 |
|A02| 2021-01-03| 132 |
|A04| 2021-01-04| 254 |
DF 标准:
DfCriteria = spark.createDataFrame([
Row(ID ="A01", StartDate=date(2021, 1, 1), EndDate=date(2021, 1, 2)),
Row(ID ="A02", StartDate=date(2021, 1, 2), EndDate=date(2021, 1, 4))])
|ID |StartDate | EndDate |
|---|-----------|------------|
|A01| 2021-01-01| 2021-01-02 |
|A02| 2021-01-02| 2021-01-04 |
预期输出
|ID | Date | SalesUSD |
|---|------------|-----------|
|A01| 2021-01-01| 324 |
|A01| 2021-01-02| 567 |
|A02| 2021-01-02| 453 |
|A02| 2021-01-03| 132 |
|A04| 2021-01-04| 254 |
假设您只想保留开始日期和结束日期之间的事件:
df.join(DfCriteria, on='ID', how='left').where(F.col('Date').between('StartDate', 'EndDate'))
您可以使用left_semi
加入:
from pyspark.sql import functions as F
df = spark.createDataFrame([
("A01", "2021-01-01", 324), ("A01", "2021-01-02", 567), ("A01", "2021-01-03", 645),
("A01", "2021-01-04", 834), ("A02", "2021-01-01", 284), ("A02", "2021-01-02", 453),
("A02", "2021-01-03", 132), ("A04", "2021-01-04", 254)], ["ID", "Date", "SalesUSD"])
DfCriteria = spark.createDataFrame([
("A01", "2021-01-01", "2021-01-02"), ("A02", "2021-01-02", "2021-01-04")
], ["ID", "StartDate", "EndDate"])
result = df.join(
DfCriteria,
(df["ID"] == DfCriteria["ID"]) & F.col("Date").between(F.col("StartDate"), F.col("EndDate")),
'left_semi'
)
result.show()
# +---+----------+--------+
# | ID| Date|SalesUSD|
# +---+----------+--------+
# |A01|2021-01-01| 324|
# |A01|2021-01-02| 567|
# |A02|2021-01-02| 453|
# |A02|2021-01-03| 132|
# +---+----------+--------+