我如何计算每日指标的一系列状态?

How do I compute a range of statuses from a daily indicator?

我有一个 df 格式为:

| name | status    | date  |
____________________________
| ben  | active    | 01/01 |
| ben  | active    | 01/02 |
| ben  | active    | 01/03 |
| ben  | in-active | 01/04 |
| ben  | in-active | 01/05 |
| ben  | active    | 01/06 |
| ben  | active    | 01/07 |

我需要创建一个格式为:

的 df
| name | status    | start_date | end_date |
____________________________________________
| ben  | active    |   01/01    |   01/03  |
| ben  | in-active |   01/04    |   01/05  |
| ben  | active    |   01/06    |   01/07  |

我很难找到最好的方法

确定状态范围的最末端需要一些技巧,但这段代码应该可以满足您的要求。

from pyspark.sql import types as T, functions as F, SparkSession, Window
import datetime
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("name", T.StringType(), False),
  T.StructField("status", T.StringType(), False),
  T.StructField("date", T.DateType(), False),
])
data = [
  {"name": "ben", "status": "active", "date": datetime.date(day=1, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=2, month=1, year=2021)},
  {"name": "ben", "status": "inactive", "date": datetime.date(day=3, month=1, year=2021)},
  {"name": "ben", "status": "inactive", "date": datetime.date(day=4, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=5, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=6, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=7, month=1, year=2021)},
]

df = spark.createDataFrame(data, schema)

df.show()

"""
+----+--------+----------+
|name|  status|      date|
+----+--------+----------+
| ben|  active|2021-01-01|
| ben|  active|2021-01-02|
| ben|inactive|2021-01-03|
| ben|inactive|2021-01-04|
| ben|  active|2021-01-05|
| ben|  active|2021-01-06|
| ben|  active|2021-01-07|
+----+--------+----------+
"""

date_window = Window().partitionBy("name").orderBy("date")
df = df.select(
  "*",
  F.lag("status").over(date_window).alias("previous_status"),
  F.lead("date").over(date_window).alias("next_date")
)

boundaries = df.filter(
  (F.col("status") != F.col("previous_status")) | (F.col("previous_status").isNull()) | (F.col("next_date").isNull())
)

boundaries.show()

"""
+----+--------+----------+---------------+----------+
|name|  status|      date|previous_status| next_date|
+----+--------+----------+---------------+----------+
| ben|  active|2021-01-01|           null|2021-01-02|
| ben|inactive|2021-01-03|         active|2021-01-04|
| ben|  active|2021-01-05|       inactive|2021-01-06|
| ben|  active|2021-01-07|         active|      null|
+----+--------+----------+---------------+----------+
"""

computed_ends = boundaries.select(
  "*",
  F.lead("date").over(date_window).alias("maybe_end_date"),
)

computed_ends.show()

"""
+----+--------+----------+---------------+----------+--------------+
|name|  status|      date|previous_status| next_date|maybe_end_date|
+----+--------+----------+---------------+----------+--------------+
| ben|  active|2021-01-01|           null|2021-01-02|    2021-01-03|
| ben|inactive|2021-01-03|         active|2021-01-04|    2021-01-05|
| ben|  active|2021-01-05|       inactive|2021-01-06|    2021-01-07|
| ben|  active|2021-01-07|         active|      null|          null|
+----+--------+----------+---------------+----------+--------------+
"""

unbounded_end = computed_ends.select(
  "*",
  F.lead("maybe_end_date").over(date_window).alias("next_end_date")
)

unbounded_end.show()

"""
+----+--------+----------+---------------+----------+--------------+-------------+
|name|  status|      date|previous_status| next_date|maybe_end_date|next_end_date|
+----+--------+----------+---------------+----------+--------------+-------------+
| ben|  active|2021-01-01|           null|2021-01-02|    2021-01-03|   2021-01-05|
| ben|inactive|2021-01-03|         active|2021-01-04|    2021-01-05|   2021-01-07|
| ben|  active|2021-01-05|       inactive|2021-01-06|    2021-01-07|         null|
| ben|  active|2021-01-07|         active|      null|          null|         null|
+----+--------+----------+---------------+----------+--------------+-------------+
"""

corrected_end = unbounded_end.select(
  F.col("name"),
  F.col("status"),
  F.col("date").alias("start_date"),
  F.when(
    F.col("next_end_date").isNull(),
    F.col("maybe_end_date")
  ).otherwise(
    F.date_sub(
      F.col("maybe_end_date"),
      1
    )
  ).alias("end_date")
).filter(
  F.col("end_date").isNotNull()
)

corrected_end.show()

"""
+----+--------+----------+----------+
|name|  status|start_date|  end_date|
+----+--------+----------+----------+
| ben|  active|2021-01-01|2021-01-02|
| ben|inactive|2021-01-03|2021-01-04|
| ben|  active|2021-01-05|2021-01-07|
+----+--------+----------+----------+
"""