计算指定时间段的不同 ID

Calculate distinct ID for a specified time period

我有一个如下所示的 Spark 数据框:

ID area_id dob dod
id1 A 2000/09/10 Null
id2 A 2001/09/28 2010/01/02
id3 B 2017/09/30 Null
id4 B 2019/10/01 2020/12/10
id5 C 2005/10/08 2010/07/13

其中 dob 是出生日期,dod 是死亡日期。

我想计算特定时间段内每个 area_idIDs 的不同数量,其中时间段可能是:

这与 or aggregating by intervals 不同,因此我将不胜感激任何关于更合适方法的想法。

  1. 用今天替换空值 --> 暂时保留 table
  2. 将 where 子句与 BETWEEN *使用 expr 函数一起使用,以便您可以使用列。 expr(" [the date in question] BETWEEN dob and dod ")
  3. 分组 area_id,ID

鉴于您的列架构如下。

types.StructField('ID', types.StringType())
types.StructField('area_id', types.StringType())
types.StructField('dob', types.DateType())
types.StructField('dod', types.DateType())

您可以使用如下 pyspark.sql 函数。

from pyspark.sql import functions

#by month
df.groupBy(df["area_id"], functions.month(df["dob"])).count()

#by quarter
df.groupBy(df["area_id"], functions.quarter(df["dob"])).count()

#by year
df.groupBy(df["area_id"], functions.year(df["dob"])).count()

#by year and month
df.groupBy(df["area_id"], functions.year(df["dob"]), functions.quarter(df["dob"])).count()

首先要查找匹配任意时间段的记录,然后在 area_id 上分组后应用 collect_set 匹配行。

我使用可扩展的基于 lambda 的系统,该系统可以扩展为任意时间段符号。在我的示例中,我介绍了在您的问题中用作示例的符号。我将 year-month 分解为同时指定了 yearmonth 的条件。

I have modified the input to include cases to better illustrate the idea

Step 1

from pyspark.sql import functions as F

data = [("id1", "A", "2000/09/10", "2021/11/10"),
        ("id2", "A", "2001/09/28", "2020/10/02",),
        ("id3", "B", "2017/09/30", None),
        ("id4", "B", "2017/10/01", "2020/12/10",),
        ("id5", "C", "2005/10/08", "2010/07/13",), ]

df = spark.createDataFrame(data, ("ID", "area_id", "dob", "dod",))\
          .withColumn("dob", F.to_date("dob", "yyyy/MM/dd"))\
          .withColumn("dod", F.to_date("dod", "yyyy/MM/dd"))

df.show()
#+---+-------+----------+----------+
#| ID|area_id|       dob|       dod|
#+---+-------+----------+----------+
#|id1|      A|2000-09-10|2021-11-10|
#|id2|      A|2001-09-28|2020-10-02|
#|id3|      B|2017-09-30|      null|
#|id4|      B|2017-10-01|2020-12-10|
#|id5|      C|2005-10-08|2010-07-13|
#+---+-------+----------+----------+

# Map of supported extractors
extractor_map = {"quarter": F.quarter, "month": F.month, "year": F.year}

# specify conditions using extractors defined
# Find rows such that the 2019-10 lies between `dob` and `dod` 
conditions = {"month": 10, "year": 2019}


# Iterate throught the conditions and in each iterations 
# update the conditional expression to include the result of the 
# condition evaluation after extracting value using the appropriate extractor

# The extractor are not `null` safe and will evaluate to `null`
# depending on how you want to tackle null, you can modify the condition
conditional_expression = F.lit(True)

for term, condition in conditions.items():
    extractor = extractor_map[term]
    conditional_expression = (conditional_expression) & (F.lit(condition).between(extractor("dob"), extractor("dod")))
    
condition_example = df.withColumn("include", conditional_expression)

condition_example.show()

#+---+-------+----------+----------+-------+
#| ID|area_id|       dob|       dod|include|
#+---+-------+----------+----------+-------+
#|id1|      A|2000-09-10|2021-11-10|   true|
#|id2|      A|2001-09-28|2020-10-02|   true|
#|id3|      B|2017-09-30|      null|   null|
#|id4|      B|2017-10-01|2020-12-10|   true|
#|id5|      C|2005-10-08|2010-07-13|  false|
#+---+-------+----------+----------+-------+

Step 2

# Filter rows that match the condition
df_to_group = condition_example.filter(F.col("include") == True)

# Grouping on `area_id` and collecting distinct `ID`
df_to_group.groupBy("area_id").agg(F.collect_set("ID")).show()

输出

+-------+---------------+
|area_id|collect_set(ID)|
+-------+---------------+
|      B|          [id4]|
|      A|     [id2, id1]|
+-------+---------------+