合并行,为每对保留最新值(元素、时间戳)

merge rows keeping latest values for each couple (element, timestamp)

我有一个 PySpark 数据框,其中包含 ID、值及其相关时间戳,这里是一个示例:

+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value|           value_ts|value2|          value2_ts|value3|          value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
|  1|  0.5|2022-03-15 00:00:00|  null|               null|     7|2022-03-15 00:00:00|
|  2|  0.2|2022-03-18 00:00:00|  null|               null|     5|2022-03-18 00:00:00|
|  3| null|               null|  null|               null|    12|2022-03-15 00:00:00|
|  1|  1.2|2022-03-18 00:00:00|  null|               null|  null|               null|
|  1| null|               null|   124|2022-03-10 00:00:00|     6|2022-03-10 00:00:00|
|  3| null|               null|   413|2022-03-18 00:00:00|  null|               null|
+---+-----+-------------------+------+-------------------+------+-------------------+

对于此数据,我想获得每对夫妇 value-value_ts 的最新值。按id分组。 在这个例子中我们有:

因为我在输入中有 3 个不同的 ID,所以我希望输出中有 3 行,如下所示:

+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value|           value_ts|value2|          value2_ts|value3|          value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
|  1|  1.2|2022-03-18 00:00:00|   124|2022-03-10 00:00:00|     7|2022-03-15 00:00:00|
|  2|  0.2|2022-03-18 00:00:00|  null|               null|     5|2022-03-18 00:00:00|
|  3| null|               null|   413|2022-03-18 00:00:00|    12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+

你能帮我用 pyspark 得到这个结果吗?

请注意,如果 ts 为 null,则相应的值也为 null, 这里 python 重现输入数据帧的代码:

from datetime import datetime
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("local_test").getOrCreate()
df = spark.createDataFrame([
    ["1", "0.5", datetime(2022, 3, 15), None, None, "7", datetime(2022, 3, 15)],
    ["2", "0.2", datetime(2022, 3, 18), None, None, "5", datetime(2022, 3, 18)],
    ["3", None, None, None, None, "12", datetime(2022, 3, 15)],
    ["1", "1.2", datetime(2022, 3, 18), None, None, None, None],
    ["1", None, None, "124", datetime(2022, 3, 10), "6", datetime(2022, 3, 10)],
    ["3", None, None, "413", datetime(2022, 3, 18), None, None],
],
    ["id", "value", "value_ts", "value2", "value2_ts", "value3", "value3_ts"]
)

这可以通过以下方式解决:

  1. 为每个 valuets 列组合识别 latest ts
  2. 将步骤 1 中标识的值复制到属于同一 id 的所有行。
  3. 最后,de-duplicated 找到每组中的第一行。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import DataFrame

def latest_ts(idf: DataFrame, val_col_name: str, ts_col_name: str) -> DataFrame:
    ws = W.partitionBy("id").orderBy(F.desc(ts_col_name)).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
    # Find the latest ts
    latest_ts = F.max(ts_col_name).over(ws)
    # Keep value corresponding to the latest ts and make others null
    latest_val = F.when(F.col(ts_col_name) == latest_ts, F.col(val_col_name)).otherwise(F.lit(None))
    # Override all values of the value column and ts column to contain the latest values.
    return idf.withColumn(val_col_name, F.first(latest_val, ignorenulls=True).over(ws)).withColumn(ts_col_name, latest_ts)

df_latest_ts = latest_ts(latest_ts(latest_ts(df, "value", "value_ts"), "value2", "value2_ts"), "value3", "value3_ts")


ws_rn = W.partitionBy("id").orderBy(F.desc("value_ts"), F.desc("value2_ts"), F.desc("value3_ts"))
(df_latest_ts.withColumn("rn", F.row_number().over(ws_rn))
             .where("rn == 1")
             .drop("rn")
).show()

"""
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value|           value_ts|value2|          value2_ts|value3|          value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
|  1|  1.2|2022-03-18 00:00:00|   124|2022-03-10 00:00:00|     7|2022-03-15 00:00:00|
|  2|  0.2|2022-03-18 00:00:00|  null|               null|     5|2022-03-18 00:00:00|
|  3| null|               null|   413|2022-03-18 00:00:00|    12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
"""