计算指定时间段的不同 ID
Calculate distinct ID for a specified time period
我有一个如下所示的 Spark 数据框:
ID
area_id
dob
dod
id1
A
2000/09/10
Null
id2
A
2001/09/28
2010/01/02
id3
B
2017/09/30
Null
id4
B
2019/10/01
2020/12/10
id5
C
2005/10/08
2010/07/13
其中 dob
是出生日期,dod
是死亡日期。
我想计算特定时间段内每个 area_id
的 IDs
的不同数量,其中时间段可能是:
- 一年(例如 2010 年、2020 年...)
- 一年一个月(2010-01、2020-12、...)
- ...
这与 or aggregating by intervals 不同,因此我将不胜感激任何关于更合适方法的想法。
- 用今天替换空值 --> 暂时保留 table
- 将 where 子句与 BETWEEN *使用 expr 函数一起使用,以便您可以使用列。
expr(" [the date in question] BETWEEN dob and dod ")
- 分组 area_id,ID
鉴于您的列架构如下。
types.StructField('ID', types.StringType())
types.StructField('area_id', types.StringType())
types.StructField('dob', types.DateType())
types.StructField('dod', types.DateType())
您可以使用如下 pyspark.sql 函数。
from pyspark.sql import functions
#by month
df.groupBy(df["area_id"], functions.month(df["dob"])).count()
#by quarter
df.groupBy(df["area_id"], functions.quarter(df["dob"])).count()
#by year
df.groupBy(df["area_id"], functions.year(df["dob"])).count()
#by year and month
df.groupBy(df["area_id"], functions.year(df["dob"]), functions.quarter(df["dob"])).count()
首先要查找匹配任意时间段的记录,然后在 area_id
上分组后应用 collect_set
匹配行。
我使用可扩展的基于 lambda 的系统,该系统可以扩展为任意时间段符号。在我的示例中,我介绍了在您的问题中用作示例的符号。我将 year-month
分解为同时指定了 year
和 month
的条件。
I have modified the input to include cases to better illustrate the idea
Step 1
from pyspark.sql import functions as F
data = [("id1", "A", "2000/09/10", "2021/11/10"),
("id2", "A", "2001/09/28", "2020/10/02",),
("id3", "B", "2017/09/30", None),
("id4", "B", "2017/10/01", "2020/12/10",),
("id5", "C", "2005/10/08", "2010/07/13",), ]
df = spark.createDataFrame(data, ("ID", "area_id", "dob", "dod",))\
.withColumn("dob", F.to_date("dob", "yyyy/MM/dd"))\
.withColumn("dod", F.to_date("dod", "yyyy/MM/dd"))
df.show()
#+---+-------+----------+----------+
#| ID|area_id| dob| dod|
#+---+-------+----------+----------+
#|id1| A|2000-09-10|2021-11-10|
#|id2| A|2001-09-28|2020-10-02|
#|id3| B|2017-09-30| null|
#|id4| B|2017-10-01|2020-12-10|
#|id5| C|2005-10-08|2010-07-13|
#+---+-------+----------+----------+
# Map of supported extractors
extractor_map = {"quarter": F.quarter, "month": F.month, "year": F.year}
# specify conditions using extractors defined
# Find rows such that the 2019-10 lies between `dob` and `dod`
conditions = {"month": 10, "year": 2019}
# Iterate throught the conditions and in each iterations
# update the conditional expression to include the result of the
# condition evaluation after extracting value using the appropriate extractor
# The extractor are not `null` safe and will evaluate to `null`
# depending on how you want to tackle null, you can modify the condition
conditional_expression = F.lit(True)
for term, condition in conditions.items():
extractor = extractor_map[term]
conditional_expression = (conditional_expression) & (F.lit(condition).between(extractor("dob"), extractor("dod")))
condition_example = df.withColumn("include", conditional_expression)
condition_example.show()
#+---+-------+----------+----------+-------+
#| ID|area_id| dob| dod|include|
#+---+-------+----------+----------+-------+
#|id1| A|2000-09-10|2021-11-10| true|
#|id2| A|2001-09-28|2020-10-02| true|
#|id3| B|2017-09-30| null| null|
#|id4| B|2017-10-01|2020-12-10| true|
#|id5| C|2005-10-08|2010-07-13| false|
#+---+-------+----------+----------+-------+
Step 2
# Filter rows that match the condition
df_to_group = condition_example.filter(F.col("include") == True)
# Grouping on `area_id` and collecting distinct `ID`
df_to_group.groupBy("area_id").agg(F.collect_set("ID")).show()
输出
+-------+---------------+
|area_id|collect_set(ID)|
+-------+---------------+
| B| [id4]|
| A| [id2, id1]|
+-------+---------------+
我有一个如下所示的 Spark 数据框:
ID | area_id | dob | dod |
---|---|---|---|
id1 | A | 2000/09/10 | Null |
id2 | A | 2001/09/28 | 2010/01/02 |
id3 | B | 2017/09/30 | Null |
id4 | B | 2019/10/01 | 2020/12/10 |
id5 | C | 2005/10/08 | 2010/07/13 |
其中 dob
是出生日期,dod
是死亡日期。
我想计算特定时间段内每个 area_id
的 IDs
的不同数量,其中时间段可能是:
- 一年(例如 2010 年、2020 年...)
- 一年一个月(2010-01、2020-12、...)
- ...
这与
- 用今天替换空值 --> 暂时保留 table
- 将 where 子句与 BETWEEN *使用 expr 函数一起使用,以便您可以使用列。
expr(" [the date in question] BETWEEN dob and dod ")
- 分组 area_id,ID
鉴于您的列架构如下。
types.StructField('ID', types.StringType())
types.StructField('area_id', types.StringType())
types.StructField('dob', types.DateType())
types.StructField('dod', types.DateType())
您可以使用如下 pyspark.sql 函数。
from pyspark.sql import functions
#by month
df.groupBy(df["area_id"], functions.month(df["dob"])).count()
#by quarter
df.groupBy(df["area_id"], functions.quarter(df["dob"])).count()
#by year
df.groupBy(df["area_id"], functions.year(df["dob"])).count()
#by year and month
df.groupBy(df["area_id"], functions.year(df["dob"]), functions.quarter(df["dob"])).count()
首先要查找匹配任意时间段的记录,然后在 area_id
上分组后应用 collect_set
匹配行。
我使用可扩展的基于 lambda 的系统,该系统可以扩展为任意时间段符号。在我的示例中,我介绍了在您的问题中用作示例的符号。我将 year-month
分解为同时指定了 year
和 month
的条件。
I have modified the input to include cases to better illustrate the idea
Step 1
from pyspark.sql import functions as F
data = [("id1", "A", "2000/09/10", "2021/11/10"),
("id2", "A", "2001/09/28", "2020/10/02",),
("id3", "B", "2017/09/30", None),
("id4", "B", "2017/10/01", "2020/12/10",),
("id5", "C", "2005/10/08", "2010/07/13",), ]
df = spark.createDataFrame(data, ("ID", "area_id", "dob", "dod",))\
.withColumn("dob", F.to_date("dob", "yyyy/MM/dd"))\
.withColumn("dod", F.to_date("dod", "yyyy/MM/dd"))
df.show()
#+---+-------+----------+----------+
#| ID|area_id| dob| dod|
#+---+-------+----------+----------+
#|id1| A|2000-09-10|2021-11-10|
#|id2| A|2001-09-28|2020-10-02|
#|id3| B|2017-09-30| null|
#|id4| B|2017-10-01|2020-12-10|
#|id5| C|2005-10-08|2010-07-13|
#+---+-------+----------+----------+
# Map of supported extractors
extractor_map = {"quarter": F.quarter, "month": F.month, "year": F.year}
# specify conditions using extractors defined
# Find rows such that the 2019-10 lies between `dob` and `dod`
conditions = {"month": 10, "year": 2019}
# Iterate throught the conditions and in each iterations
# update the conditional expression to include the result of the
# condition evaluation after extracting value using the appropriate extractor
# The extractor are not `null` safe and will evaluate to `null`
# depending on how you want to tackle null, you can modify the condition
conditional_expression = F.lit(True)
for term, condition in conditions.items():
extractor = extractor_map[term]
conditional_expression = (conditional_expression) & (F.lit(condition).between(extractor("dob"), extractor("dod")))
condition_example = df.withColumn("include", conditional_expression)
condition_example.show()
#+---+-------+----------+----------+-------+
#| ID|area_id| dob| dod|include|
#+---+-------+----------+----------+-------+
#|id1| A|2000-09-10|2021-11-10| true|
#|id2| A|2001-09-28|2020-10-02| true|
#|id3| B|2017-09-30| null| null|
#|id4| B|2017-10-01|2020-12-10| true|
#|id5| C|2005-10-08|2010-07-13| false|
#+---+-------+----------+----------+-------+
Step 2
# Filter rows that match the condition
df_to_group = condition_example.filter(F.col("include") == True)
# Grouping on `area_id` and collecting distinct `ID`
df_to_group.groupBy("area_id").agg(F.collect_set("ID")).show()
输出
+-------+---------------+
|area_id|collect_set(ID)|
+-------+---------------+
| B| [id4]|
| A| [id2, id1]|
+-------+---------------+