合并行,为每对保留最新值(元素、时间戳)
merge rows keeping latest values for each couple (element, timestamp)
我有一个 PySpark 数据框,其中包含 ID、值及其相关时间戳,这里是一个示例:
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 0.5|2022-03-15 00:00:00| null| null| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| null| null| 12|2022-03-15 00:00:00|
| 1| 1.2|2022-03-18 00:00:00| null| null| null| null|
| 1| null| null| 124|2022-03-10 00:00:00| 6|2022-03-10 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| null| null|
+---+-----+-------------------+------+-------------------+------+-------------------+
对于此数据,我想获得每对夫妇 value-value_ts 的最新值。按id分组。
在这个例子中我们有:
- id=1:
- 最新的value_ts是2022-03-1800:00:00,对应的值为1.2
- 最新的value2_ts是2021-03-1000:00:00,对应的value2是124
- 最新的value3_ts是2022-03-1500:00:00,对应的value3是7
- id=2:
- 最新的value_ts是2022-03-1800:00:00,对应的值为0.2
- latest value2_ts为null,对应value2为null
- 最新的value3_ts是2022-03-1800:00:00,对应的value3是5
- id=3:
- latest value_ts为null,对应值为null
- 最新的value2_ts是2022-03-1800:00:00,对应的value2是413
- 最新的value3_ts是2022-03-1500:00:00,对应的value3是12
因为我在输入中有 3 个不同的 ID,所以我希望输出中有 3 行,如下所示:
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 1.2|2022-03-18 00:00:00| 124|2022-03-10 00:00:00| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| 12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
你能帮我用 pyspark 得到这个结果吗?
请注意,如果 ts 为 null,则相应的值也为 null,
这里 python 重现输入数据帧的代码:
from datetime import datetime
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("local_test").getOrCreate()
df = spark.createDataFrame([
["1", "0.5", datetime(2022, 3, 15), None, None, "7", datetime(2022, 3, 15)],
["2", "0.2", datetime(2022, 3, 18), None, None, "5", datetime(2022, 3, 18)],
["3", None, None, None, None, "12", datetime(2022, 3, 15)],
["1", "1.2", datetime(2022, 3, 18), None, None, None, None],
["1", None, None, "124", datetime(2022, 3, 10), "6", datetime(2022, 3, 10)],
["3", None, None, "413", datetime(2022, 3, 18), None, None],
],
["id", "value", "value_ts", "value2", "value2_ts", "value3", "value3_ts"]
)
这可以通过以下方式解决:
- 为每个
value
和 ts
列组合识别 latest ts
。
- 将步骤 1 中标识的值复制到属于同一
id
的所有行。
- 最后,de-duplicated 找到每组中的第一行。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import DataFrame
def latest_ts(idf: DataFrame, val_col_name: str, ts_col_name: str) -> DataFrame:
ws = W.partitionBy("id").orderBy(F.desc(ts_col_name)).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
# Find the latest ts
latest_ts = F.max(ts_col_name).over(ws)
# Keep value corresponding to the latest ts and make others null
latest_val = F.when(F.col(ts_col_name) == latest_ts, F.col(val_col_name)).otherwise(F.lit(None))
# Override all values of the value column and ts column to contain the latest values.
return idf.withColumn(val_col_name, F.first(latest_val, ignorenulls=True).over(ws)).withColumn(ts_col_name, latest_ts)
df_latest_ts = latest_ts(latest_ts(latest_ts(df, "value", "value_ts"), "value2", "value2_ts"), "value3", "value3_ts")
ws_rn = W.partitionBy("id").orderBy(F.desc("value_ts"), F.desc("value2_ts"), F.desc("value3_ts"))
(df_latest_ts.withColumn("rn", F.row_number().over(ws_rn))
.where("rn == 1")
.drop("rn")
).show()
"""
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 1.2|2022-03-18 00:00:00| 124|2022-03-10 00:00:00| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| 12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
"""
我有一个 PySpark 数据框,其中包含 ID、值及其相关时间戳,这里是一个示例:
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 0.5|2022-03-15 00:00:00| null| null| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| null| null| 12|2022-03-15 00:00:00|
| 1| 1.2|2022-03-18 00:00:00| null| null| null| null|
| 1| null| null| 124|2022-03-10 00:00:00| 6|2022-03-10 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| null| null|
+---+-----+-------------------+------+-------------------+------+-------------------+
对于此数据,我想获得每对夫妇 value-value_ts 的最新值。按id分组。 在这个例子中我们有:
- id=1:
- 最新的value_ts是2022-03-1800:00:00,对应的值为1.2
- 最新的value2_ts是2021-03-1000:00:00,对应的value2是124
- 最新的value3_ts是2022-03-1500:00:00,对应的value3是7
- id=2:
- 最新的value_ts是2022-03-1800:00:00,对应的值为0.2
- latest value2_ts为null,对应value2为null
- 最新的value3_ts是2022-03-1800:00:00,对应的value3是5
- id=3:
- latest value_ts为null,对应值为null
- 最新的value2_ts是2022-03-1800:00:00,对应的value2是413
- 最新的value3_ts是2022-03-1500:00:00,对应的value3是12
因为我在输入中有 3 个不同的 ID,所以我希望输出中有 3 行,如下所示:
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 1.2|2022-03-18 00:00:00| 124|2022-03-10 00:00:00| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| 12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
你能帮我用 pyspark 得到这个结果吗?
请注意,如果 ts 为 null,则相应的值也为 null, 这里 python 重现输入数据帧的代码:
from datetime import datetime
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("local_test").getOrCreate()
df = spark.createDataFrame([
["1", "0.5", datetime(2022, 3, 15), None, None, "7", datetime(2022, 3, 15)],
["2", "0.2", datetime(2022, 3, 18), None, None, "5", datetime(2022, 3, 18)],
["3", None, None, None, None, "12", datetime(2022, 3, 15)],
["1", "1.2", datetime(2022, 3, 18), None, None, None, None],
["1", None, None, "124", datetime(2022, 3, 10), "6", datetime(2022, 3, 10)],
["3", None, None, "413", datetime(2022, 3, 18), None, None],
],
["id", "value", "value_ts", "value2", "value2_ts", "value3", "value3_ts"]
)
这可以通过以下方式解决:
- 为每个
value
和ts
列组合识别latest ts
。 - 将步骤 1 中标识的值复制到属于同一
id
的所有行。 - 最后,de-duplicated 找到每组中的第一行。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import DataFrame
def latest_ts(idf: DataFrame, val_col_name: str, ts_col_name: str) -> DataFrame:
ws = W.partitionBy("id").orderBy(F.desc(ts_col_name)).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
# Find the latest ts
latest_ts = F.max(ts_col_name).over(ws)
# Keep value corresponding to the latest ts and make others null
latest_val = F.when(F.col(ts_col_name) == latest_ts, F.col(val_col_name)).otherwise(F.lit(None))
# Override all values of the value column and ts column to contain the latest values.
return idf.withColumn(val_col_name, F.first(latest_val, ignorenulls=True).over(ws)).withColumn(ts_col_name, latest_ts)
df_latest_ts = latest_ts(latest_ts(latest_ts(df, "value", "value_ts"), "value2", "value2_ts"), "value3", "value3_ts")
ws_rn = W.partitionBy("id").orderBy(F.desc("value_ts"), F.desc("value2_ts"), F.desc("value3_ts"))
(df_latest_ts.withColumn("rn", F.row_number().over(ws_rn))
.where("rn == 1")
.drop("rn")
).show()
"""
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 1.2|2022-03-18 00:00:00| 124|2022-03-10 00:00:00| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| 12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
"""